下面列出了怎么用io.reactivex.rxjava3.functions.Consumer的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
final com.spotify.mobius.disposables.Disposable disposable =
eventSource.subscribe(
new com.spotify.mobius.functions.Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
}
});
}
});
}
/**
* Optionally set a shared error handler in case a handler throws an uncaught exception.
*
* <p>The default is to use {@link RxJavaPlugins#onError(Throwable)}. Note that any exception
* thrown by a handler is a fatal error and this method doesn't enable safe error handling, only
* configurable crash reporting.
*
* @param function a function that gets told which sub-transformer failed and should return an
* appropriate handler for exceptions thrown.
*/
public RxMobius.SubtypeEffectHandlerBuilder<F, E> withFatalErrorHandler(
final Function<ObservableTransformer<? extends F, E>, Consumer<Throwable>> function) {
checkNotNull(function);
this.onErrorFunction =
new OnErrorFunction<ObservableTransformer<? extends F, E>, Consumer<Throwable>>() {
@Override
public Consumer<Throwable> apply(ObservableTransformer<? extends F, E> effectHandler) {
try {
return function.apply(effectHandler);
} catch (Throwable e) {
throw new RuntimeException(
"FATAL: fatal error handler threw exception for effect handler: "
+ effectHandler,
e);
}
}
};
return this;
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
@Test
public void consumerTest(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final Disposable disposable = observable.subscribe(onNext);
logger.fine(String.valueOf(disposable));
assertEquals(5, result.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
@Test
public void consumerTest2(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList));
assertEquals(5, result.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertTrue(errorList.isEmpty());
assertNull(tracer.scopeManager().active());
}
@Test
public void consumerTest3(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));
assertEquals(5, result.size());
assertTrue(completeList.contains(COMPLETED));
assertEquals(1, completeList.size());
assertTrue(errorList.isEmpty());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
@Test
public void consumerTest3WithError(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, true);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));
assertEquals(0, result.size());
assertEquals(0, completeList.size());
assertEquals(1, errorList.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
public static void main(String[] args) {
ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);
RxCache rxCache = (RxCache) ctx.getBean("rxCache");
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);
RxCache rxCache = (RxCache) ctx.getBean("rxCache");
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new MapDBImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new CaffeineImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new GuavaCacheImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
private static void testObject(RxCache rxCache) {
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test", u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void testObject(RxCache rxCache) {
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder());
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
flowable.subscribe();
subject.onNext("Foo");
Consumer<String> brokenAction = new Consumer<String>() {
@Override public void accept(String s) {
throw new OutOfMemoryError("broken!");
}
};
try {
flowable.subscribe(brokenAction);
fail();
} catch (OutOfMemoryError e) {
assertEquals("broken!", e.getMessage());
}
}
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
observable.subscribe();
subject.onNext("Foo");
Consumer<String> brokenAction = new Consumer<String>() {
@Override public void accept(String s) {
throw new OutOfMemoryError("broken!");
}
};
try {
observable.subscribe(brokenAction);
fail();
} catch (OutOfMemoryError e) {
assertEquals("broken!", e.getMessage());
}
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Log.d(TAG, "onCreate()");
setContentView(R.layout.activity_main);
// Specifically bind this until onPause()
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onCreate()");
}
})
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
}
});
}
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart()");
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
}
@Override
protected void onResume() {
super.onResume();
Log.d(TAG, "onResume()");
// `this.<Long>` is necessary if you're compiling on JDK7 or below.
//
// If you're using JDK8+, then you can safely remove it.
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onResume()");
}
})
.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
}
});
}
/**
* Create an event source from the given RxJava streams.
*
* <p>All streams must be mapped to your event type.
*
* @param sources the observables you want to include in this event source
* @param <E> the event type
* @return an EventSource based on the provided observables
*/
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) throws Throwable {
eventConsumer.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
});
return new com.spotify.mobius.disposables.Disposable() {
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
private static <F, E> Consumer<Throwable> defaultOnError(
final ObservableTransformer<? extends F, E> effectHandler) {
return new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
RxJavaPlugins.onError(
new ConnectionException(
"in effect handler: " + effectHandler.getClass().toString(), throwable));
}
};
}
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
return NULL;
if (!isTracingEnabled) {
isTracingEnabled = true;
TracingRxJava3Utils.enableTracing();
}
if (arg0 instanceof Observer)
return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());
if (!(arg0 instanceof Consumer))
return NULL;
final TracingConsumer<Object> tracingConsumer;
if (argc == 1)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
else if (argc == 2)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
else if (argc == 3)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
else
tracingConsumer = null;
if (tracingConsumer != null)
((Observable<Object>)thiz).subscribe(tracingConsumer);
return null;
}
@Test
public void consumerTest4(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> subscribeList = new ArrayList<>();
final String subscribed = "subscribed";
final Consumer<Object> onSubscribe = new Consumer<Object>() {
@Override
public void accept(final Object t) {
subscribeList.add(subscribed);
}
};
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.doOnSubscribe(onSubscribe).subscribe(onNext, onError(errorList), onComplete(completeList, tracer));
assertEquals(5, result.size());
assertEquals(1, completeList.size());
assertTrue(completeList.contains(COMPLETED));
assertEquals(1, subscribeList.size());
assertTrue(subscribeList.contains(subscribed));
assertTrue(errorList.isEmpty());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
@Test
public void consumerTest4WithError(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, true);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> subscribeList = new ArrayList<>();
final Consumer<Object> onSubscribe = new Consumer<Object>() {
@Override
public void accept(final Object t) {
subscribeList.add("subscribed");
}
};
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.doOnSubscribe(onSubscribe).subscribe(onNext, onError(errorList), onComplete(completeList, tracer));
assertEquals(0, result.size());
assertEquals(0, completeList.size());
assertEquals(1, subscribeList.size());
assertEquals(1, errorList.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
private static <T>Consumer<T> consumer(final List<T> result) {
return new Consumer<T>() {
@Override
public void accept(final T t) {
logger.fine(String.valueOf(t));
result.add(t);
}
};
}
private static Consumer<Throwable> onError(final List<String> errorList) {
return new Consumer<Throwable>() {
@Override
public void accept(final Throwable t) {
errorList.add("error");
}
};
}
private static void testList(RxCache rxCache) {
List<User> list = new ArrayList<>();
User u1 = new User();
u1.name = "tonyList1";
u1.password = "list1123456";
list.add(u1);
User u2 = new User();
u2.name = "tonyList12";
u2.password = "list12345";
list.add(u2);
rxCache.save("list", list);
Type type = TypeBuilder
.newInstance(List.class)
.addTypeParam(User.class)
.build();
Observable<Record<List<User>>> observable = rxCache.load2Observable("list", type);
observable.subscribe(new Consumer<Record<List<User>>>() {
@Override
public void accept(Record<List<User>> record) throws Exception {
List<User> recordDataList = record.getData();
if (Preconditions.isNotBlank(recordDataList)) {
for (User user : recordDataList) {
System.out.println(user.name);
System.out.println(user.password);
}
}
}
});
}
private static void testSet(RxCache rxCache) {
Set<User> set = new HashSet<>();
User u1 = new User();
u1.name = "tonySet1";
u1.password = "set1123456";
set.add(u1);
User u2 = new User();
u2.name = "tonySet12";
u2.password = "set12345";
set.add(u2);
rxCache.save("set", set);
Type type = TypeBuilder
.newInstance(Set.class)
.addTypeParam(User.class)
.build();
Observable<Record<Set<User>>> observable = rxCache.load2Observable("set", type);
observable.subscribe(new Consumer<Record<Set<User>>>() {
@Override
public void accept(Record<Set<User>> record) throws Exception {
Set<User> recordDataList = record.getData();
if (Preconditions.isNotBlank(recordDataList)) {
for (User user : recordDataList) {
System.out.println(user.name);
System.out.println(user.password);
}
}
}
});
}
public static void testList(RxCache rxCache) {
List<User> list = new ArrayList<>();
User u1 = new User();
u1.name = "tonyList1";
u1.password = "list1123456";
list.add(u1);
User u2 = new User();
u2.name = "tonyList12";
u2.password = "list12345";
list.add(u2);
rxCache.save("list", list);
Type type = TypeBuilder
.newInstance(List.class)
.addTypeParam(User.class)
.build();
Observable<Record<List<User>>> observable = rxCache.load2Observable("list", type);
observable.subscribe(new Consumer<Record<List<User>>>() {
@Override
public void accept(Record<List<User>> record) throws Exception {
List<User> recordDataList = record.getData();
if (Preconditions.isNotBlank(recordDataList)) {
for (User user : recordDataList) {
System.out.println(user.name);
System.out.println(user.password);
}
}
}
});
}