下面列出了怎么用io.reactivex.rxjava3.core.Observable的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Add an {@link ObservableTransformer} for handling effects of a given type. The handler will
* receive all effect objects that extend the given class.
*
* <p>Adding handlers for two effect classes where one is a super-class of the other is
* considered a collision and is not allowed. Registering the same class twice is also
* considered a collision.
*
* @param effectClass the class to handle
* @param effectHandler the effect handler for the given effect class
* @param <G> the effect class as a type parameter
* @return this builder
* @throws IllegalArgumentException if there is a handler collision
*/
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addTransformer(
final Class<G> effectClass, final ObservableTransformer<G, E> effectHandler) {
checkNotNull(effectClass);
checkNotNull(effectHandler);
for (Class<?> cls : effectPerformerMap.keySet()) {
if (cls.isAssignableFrom(effectClass) || effectClass.isAssignableFrom(cls)) {
throw new IllegalArgumentException(
"Effect classes may not be assignable to each other, collision found: "
+ effectClass.getSimpleName()
+ " <-> "
+ cls.getSimpleName());
}
}
effectPerformerMap.put(
effectClass,
(Observable<F> effects) ->
effects
.ofType(effectClass)
.compose(effectHandler)
.doOnError(onErrorFunction.apply(effectHandler)));
return this;
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart()");
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
}
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
final List<String> effects = Arrays.asList("Hello", "Rx");
PublishSubject<String> upstream = PublishSubject.create();
Function<String, Integer> sleepyFunction =
s -> {
try {
Thread.sleep(duration(s));
} catch (InterruptedException ie) {
}
return s.length();
};
final List<Integer> results = new ArrayList<>();
upstream
.compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
.subscribe(results::add);
Observable.fromIterable(effects).subscribe(upstream);
await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
@Test
public void shouldThrowIfStartingALoopWithInitAndStartEffects() throws Exception {
MobiusLoop.Builder<String, Integer, Boolean> withInit =
builder.init(
new Init<String, Boolean>() {
@Nonnull
@Override
public First<String, Boolean> init(String model) {
return First.first(model + "-init");
}
});
ObservableTransformer<Integer, String> transformer =
RxMobius.loopFrom(withInit, "hi", effects(true));
Observable.just(10)
.compose(transformer)
.test()
.assertError(t -> t.getMessage().contains("cannot pass in start effects"));
}
private static void testObject(RxCache rxCache) {
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test", u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@Test
public void testGetHelloWorldJson() throws Exception {
String address = "http://localhost:" + PORT + "/rx3/observable/textJson";
List<Object> providers = new LinkedList<>();
providers.add(new JacksonJsonProvider());
providers.add(new ObservableRxInvokerProvider());
WebClient wc = WebClient.create(address, providers);
Observable<HelloWorldBean> obs = wc.accept("application/json")
.rx(ObservableRxInvoker.class)
.get(HelloWorldBean.class);
Holder<HelloWorldBean> holder = new Holder<>();
Disposable d = obs.subscribe(v -> {
holder.value = v;
});
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
Thread.sleep(2000);
assertEquals("Hello", holder.value.getGreeting());
assertEquals("World", holder.value.getAudience());
}
@Test
public void consumerTest3WithError(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, true);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> completeList = new ArrayList<>();
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));
assertEquals(0, result.size());
assertEquals(0, completeList.size());
assertEquals(1, errorList.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
public static void main(String[] args) {
ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);
RxCache rxCache = (RxCache) ctx.getBean("rxCache");
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@Nonnull
@CheckReturnValue
public Observable<JsonArray> getReactionsRaw(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final String emoji, @Nullable final String before,
@Nullable final String after, @Nonnegative final int limit) {
final QueryStringBuilder builder = new QueryStringBuilder();
if(limit > 0) {
builder.append("limit", Integer.toString(limit));
}
if(before != null) {
builder.append("before", before);
}
if(after != null) {
builder.append("after", after);
}
final String query = builder.build();
return catnip().requester()
.queue(new OutboundRequest(Routes.GET_REACTIONS.withMajorParam(channelId).withQueryString(query),
Map.of("message", messageId, "emojis", encodeUTF8(emoji))))
.map(ResponsePayload::array);
}
@Override
public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {
Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);
Observable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Observable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
void setClickObservable(Observable<Point> observable) {
logger.info("Setting observable");
disposable = observable.subscribeOn(Schedulers.computation())
.subscribe(point -> {
startingPoint = point;
runPixelTraversal();
});
}
@Nonnull
@CheckReturnValue
public Observable<Webhook> getGuildWebhooks(@Nonnull final String guildId) {
return getGuildWebhooksRaw(guildId)
.map(e -> mapObjectContents(entityBuilder()::createWebhook).apply(e))
.flatMapIterable(e -> e);
}
@Test
public void doNotTimeoutEmpty() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestObserver<?> observer = Observable.empty()
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
observer.assertComplete();
then(timeLimiter).should()
.onSuccess();
}
@Override
public void exportRDB(OutputStream output) {
executeOn(Observable.create(observable -> {
getState().exportRDB(output);
observable.onComplete();
})).blockingSubscribe();
}
@Test
public void rx_sumPopulationOfCountries() {
// hint: use "map" operator
TestObserver<Long> testObserver = countriesService
.sumPopulationOfCountries(Observable.fromIterable(allCountries), Observable.fromIterable(allCountries))
.test();
testObserver.assertResult(CountriesTestProvider.sumPopulationOfAllCountries()
+ CountriesTestProvider.sumPopulationOfAllCountries());
testObserver.assertNoErrors();
}
@Nonnull
public Observable<JsonArray> listGuildEmojisRaw(@Nonnull final String guildId) {
return catnip().requester().queue(
new OutboundRequest(
Routes.LIST_GUILD_EMOJIS.withMajorParam(guildId),
Map.of()))
.map(ResponsePayload::array);
}
/**
* Returns {@code true} if the specified {@link Class} can be handled by the
* {@link ObservableResponseConverterFunction}.
*/
private static boolean isSupportedClass(Class<?> clazz) {
return Observable.class.isAssignableFrom(clazz) ||
Maybe.class.isAssignableFrom(clazz) ||
Single.class.isAssignableFrom(clazz) ||
Completable.class.isAssignableFrom(clazz);
}
@Nonnull
@CheckReturnValue
public Observable<JsonArray> listVoiceRegionsRaw() {
return catnip().requester().queue(new OutboundRequest(Routes.LIST_VOICE_REGIONS,
Map.of()))
.map(ResponsePayload::array);
}
@Test
public void shouldSubscribeToObservableJust() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
Observable.just("Event 1", "Event 2")
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertResult("Event 1", "Event 2");
then(circuitBreaker).should().onSuccess(anyLong(), any(TimeUnit.class));
then(circuitBreaker).should(never())
.onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
private static void testSet(RxCache rxCache) {
Set<User> set = new HashSet<>();
User u1 = new User();
u1.name = "tonySet1";
u1.password = "set1123456";
set.add(u1);
User u2 = new User();
u2.name = "tonySet12";
u2.password = "set12345";
set.add(u2);
rxCache.save("set", set);
Type type = TypeBuilder
.newInstance(Set.class)
.addTypeParam(User.class)
.build();
Observable<Record<Set<User>>> observable = rxCache.load2Observable("set", type);
observable.subscribe(new Consumer<Record<Set<User>>>() {
@Override
public void accept(Record<Set<User>> record) throws Exception {
Set<User> recordDataList = record.getData();
if (Preconditions.isNotBlank(recordDataList)) {
for (User user : recordDataList) {
System.out.println(user.name);
System.out.println(user.password);
}
}
}
});
}
@Nonnull
@CheckReturnValue
public Observable<JsonObject> modifyGuildEmbedRaw(@Nonnull final String guildId, @Nullable final String channelId,
final boolean enabled, @Nullable final String reason) {
return catnip().requester()
.queue(new OutboundRequest(Routes.MODIFY_GUILD_EMBED.withMajorParam(guildId), Map.of(),
JsonObject.builder().value("channel_id", channelId).value("enabled", enabled).done(), reason))
.map(ResponsePayload::object);
}
@Test
public void shouldDelaySubscription() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(1).toNanos());
Observable.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.awaitDone(2, TimeUnit.SECONDS);
}
/**
* Fetches up to {@link #limit(int) limit} entities, returning a list
* containing all of them.
* <br><b>This method will keep all entities in memory</b>, so for unbounded
* pagination it should be avoided.
*
* @return A list containing all the fetched entities.
*/
@Nonnull
@CheckReturnValue
public Observable<T> fetch() {
final List<T> list = new ArrayList<>();
return forEach(list::add)
.map(__ -> Collections.unmodifiableList(list))
.flatMapIterable(e -> e);
}
@Nonnull
@CheckReturnValue
public Observable<CreatedInvite> getGuildInvites(@Nonnull final String guildId) {
return getGuildInvitesRaw(guildId)
.map(e -> mapObjectContents(entityBuilder()::createCreatedInvite).apply(e))
.flatMapIterable(e -> e);
}
public AuditLogPaginator getGuildAuditLog(@Nonnull final String guildId) {
return new AuditLogPaginator(entityBuilder()) {
@Nonnull
@CheckReturnValue
@Override
protected Observable<JsonObject> fetchNext(@Nonnull final RequestState<AuditLogEntry> state, @Nullable final String lastId,
@Nonnegative final int requestSize) {
return getGuildAuditLogRaw(guildId, state.extra("user"), lastId, state.extra("type"), requestSize);
}
};
}
@Override
public void importRDB(InputStream input) {
executeOn(Observable.create(observable -> {
getState().importRDB(input);
observable.onComplete();
})).blockingSubscribe();
}
@Nonnull
@CheckReturnValue
public Observable<JsonObject> getGuildPruneCountRaw(@Nonnull final String guildId, @Nonnegative final int days) {
final String query = new QueryStringBuilder().append("days", Integer.toString(days)).build();
return catnip().requester().queue(new OutboundRequest(Routes.GET_GUILD_PRUNE_COUNT
.withMajorParam(guildId).withQueryString(query), Map.of()))
.map(ResponsePayload::object);
}
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed
for (int i = 0; i < 10; i++) {
final Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
String v = "" + i;
src.accept(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String t1) {
return Observable.just(t1 + ", " + t1);
}
})
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
o.onNext(t);
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onComplete() {
o.onComplete();
}
});
inOrder.verify(o).onNext(v + ", " + v);
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
@Nonnull
@CheckReturnValue
public Observable<JsonObject> createGuildRoleRaw(@Nonnull final String guildId, @Nonnull final RoleData roleData,
@Nullable final String reason) {
return catnip().requester()
.queue(new OutboundRequest(Routes.CREATE_GUILD_ROLE.withMajorParam(guildId),
Map.of(), roleData.toJson(), reason))
.map(ResponsePayload::object);
}