下面列出了怎么用io.reactivex.rxjava3.core.Completable的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
@Test
public void doNotTimeout() {
given(helloWorldService.returnHelloWorld())
.willReturn("hello");
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestObserver<?> observer = Completable.fromRunnable(helloWorldService::returnHelloWorld)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertComplete();
then(timeLimiter).should()
.onSuccess();
}
@Override
protected void subscribeActual(SingleObserver<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterSingleObserver(downstream)));
} else {
upstream.subscribe(new RateLimiterSingleObserver(downstream));
}
} else {
downstream.onSubscribe(EmptyDisposable.INSTANCE);
downstream.onError(RequestNotPermitted.createRequestNotPermitted(rateLimiter));
}
}
@Test
public void shouldComplete() {
given(bulkhead.tryAcquirePermission()).willReturn(true);
Completable.complete()
.compose(BulkheadOperator.of(bulkhead))
.test()
.assertComplete();
then(bulkhead).should().onComplete();
}
/**
* Add a new pet to the store
*
* @param body Pet object that needs to be added to the store (required)
* @return Completable
*/
@Headers({
"Content-Type:application/json"
})
@POST("pet")
Completable addPet(
@retrofit2.http.Body Pet body
);
/**
* Update an existing pet
*
* @param body Pet object that needs to be added to the store (required)
* @return Completable
*/
@Headers({
"Content-Type:application/json"
})
@PUT("pet")
Completable updatePet(
@retrofit2.http.Body Pet body
);
/**
* creates an XmlItem
* this route creates an XmlItem
* @param xmlItem XmlItem Body (required)
* @return Completable
*/
@Headers({
"Content-Type:application/xml"
})
@POST("fake/create_xml_item")
Completable createXmlItem(
@retrofit2.http.Body XmlItem xmlItem
);
@Nonnull
@CheckReturnValue
public Completable modifyGuildChannelPositions(@Nonnull final PositionUpdater updater,
@Nullable final String reason) {
final JsonArray array = new JsonArray();
updater.entries()
.stream()
.map(x -> JsonObject.builder().value("id", x.getKey()).value("position", x.getValue()).done())
.forEach(array::add);
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.MODIFY_GUILD_CHANNEL_POSITIONS.withMajorParam(updater.guildId()),
Map.of(), array, reason)));
}
@Nonnull
public Completable deleteGuildEmoji(@Nonnull final String guildId, @Nonnull final String emojiId,
@Nullable final String reason) {
return Completable.fromObservable(catnip().requester().queue(
new OutboundRequest(
Routes.DELETE_GUILD_EMOJI.withMajorParam(guildId),
Map.of("emojis", emojiId)).reason(reason).emptyBody(true)));
}
@Nonnull
@CheckReturnValue
public Completable deleteGuildRole(@Nonnull final String guildId, @Nonnull final String roleId,
@Nullable final String reason) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.DELETE_GUILD_ROLE.withMajorParam(guildId),
Map.of("role", roleId)).reason(reason).emptyBody(true)));
}
@Nonnull
public Completable removeGuildBan(@Nonnull final String guildId, @Nonnull final String userId,
@Nullable final String reason) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.REMOVE_GUILD_BAN.withMajorParam(guildId),
Map.of("user", userId)).reason(reason).emptyBody(true)));
}
@Test
public void timeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Maybe.timer(1, TimeUnit.MINUTES)
.flatMapCompletable(t -> Completable.complete())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
/**
* Add a reaction to the message with the given id in this channel.
*
* @param messageId The id of the message to add a reaction to.
* @param emoji The reaction to add.
*
* @return A Observable that completes when the reaction is added.
*/
@Nonnull
default Completable addReaction(@Nonnull final String messageId, @Nonnull final String emoji) {
if(isGuild()) {
PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
Permission.ADD_REACTIONS, Permission.READ_MESSAGE_HISTORY);
}
return catnip().rest().channel().addReaction(id(), messageId, emoji);
}
/**
* Add a reaction to the message with the given id in this channel.
*
* @param messageId The id of the message to add a reaction to.
* @param emoji The reaction to add.
*
* @return A Observable that completes when the reaction is added.
*/
@Nonnull
default Completable addReaction(@Nonnull final String messageId, @Nonnull final Emoji emoji) {
if(isGuild()) {
PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
Permission.ADD_REACTIONS, Permission.READ_MESSAGE_HISTORY);
}
return catnip().rest().channel().addReaction(id(), messageId, emoji);
}
/**
* Delete a user's reaction on the given message.
*
* @param messageId The id of the message to remove a reaction from.
* @param userId The id of the user whose reaction is to be removed.
* @param emoji The reaction to remove.
*
* @return A Observable that completes when the reaction is removed.
*/
@Nonnull
default Completable deleteUserReaction(@Nonnull final String messageId, @Nonnull final String userId,
@Nonnull final String emoji) {
if(isGuild()) {
PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
Permission.MANAGE_MESSAGES);
}
return catnip().rest().channel().deleteUserReaction(id(), messageId, userId, emoji);
}
/**
* Delete all reactions on the given message
*
* @param messageId The id of the message to remove all reactions from.
*
* @return A Observable that completes when the reaction is removed.
*/
@Nonnull
default Completable bulkRemoveReaction(@Nonnull final String messageId) {
if(isGuild()) {
PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
Permission.MANAGE_MESSAGES);
}
return catnip().rest().channel().deleteAllReactions(id(), messageId);
}
/**
* Deletes the webhook.
*
* @return A Single that completes when the webhook is deleted.
*/
@Nonnull
@CheckReturnValue
default Completable delete() {
PermissionUtil.checkPermissions(catnip(), guildId(), channelId(), Permission.MANAGE_WEBHOOKS);
return catnip().rest().webhook().deleteWebhook(id());
}
@Nonnull
default Completable delete(@Nullable final String reason) {
final User self = catnip().selfUser();
if(self != null && !author().id().equals(self.id())) {
PermissionUtil.checkPermissions(catnip(), guildId(), channelId(),
Permission.MANAGE_MESSAGES);
}
return catnip().rest().channel().deleteMessage(channelId(), id(), reason);
}
private Completable maybeCache(final String eventType, final int shardId, final JsonObject data) {
if(CACHE_EVENTS.contains(eventType)) {
try {
return catnip().cacheWorker().updateCache(eventType, shardId, data);
} catch(final Exception e) {
catnip().logAdapter().warn("Got error updating cache for payload {}", eventType, e);
catnip().logAdapter().warn("Payload: {}", JsonUtil.encodePrettily(data));
return RxHelpers.completedCompletable(catnip());
}
} else {
return RxHelpers.completedCompletable(catnip());
}
}
@Test
public void shouldDelaySubscription() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(1).toNanos());
Completable.complete()
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.awaitDone(1, TimeUnit.SECONDS);
}
@Nonnull
public Completable addGuildMemberRole(@Nonnull final String guildId, @Nonnull final String userId,
@Nonnull final String roleId, @Nullable final String reason) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.ADD_GUILD_MEMBER_ROLE.withMajorParam(guildId),
Map.of("user", userId, "role", roleId)).reason(reason).emptyBody(true)));
}
@Nonnull
public Completable deleteMessage(@Nonnull final String channelId, @Nonnull final String messageId,
@Nullable final String reason) {
return catnip().requester().queue(new OutboundRequest(Routes.DELETE_MESSAGE.withMajorParam(channelId),
Map.of("message", messageId)).reason(reason))
.ignoreElements();
}
@Nonnull
public Completable editPermissionOverride(@Nonnull final String channelId, @Nonnull final PermissionOverride overwrite,
@Nonnull final Collection<Permission> allowed,
@Nonnull final Collection<Permission> denied,
@Nullable final String reason) {
return editPermissionOverride(channelId,
overwrite.id(),
allowed,
denied,
overwrite.type() == OverrideType.MEMBER,
reason
);
}
@Nonnull
public Completable addReaction(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final String emoji) {
return catnip().requester().queue(new OutboundRequest(Routes.CREATE_REACTION.withMajorParam(channelId),
Map.of("message", messageId, "emojis", encodeUTF8(emoji)), new JsonObject()))
.ignoreElements();
}
@Override
protected void subscribeActual(Subscriber<? super T> downstream) {
long waitDuration = rateLimiter.reservePermission();
if (waitDuration >= 0) {
if (waitDuration > 0) {
Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
.subscribe(() -> upstream.subscribe(new RateLimiterSubscriber(downstream)));
} else {
upstream.subscribe(new RateLimiterSubscriber(downstream));
}
} else {
downstream.onSubscribe(EmptySubscription.INSTANCE);
downstream.onError(createRequestNotPermitted(rateLimiter));
}
}
@Nonnull
public Completable editPermissionOverride(@Nonnull final String channelId, @Nonnull final String overwriteId,
@Nonnull final Collection<Permission> allowed,
@Nonnull final Collection<Permission> denied,
final boolean isMember) {
return editPermissionOverride(channelId, overwriteId, allowed, denied, isMember, null);
}
@Nonnull
public Completable deleteEmojiReaction(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final String emoji) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.DELETE_EMOJI_REACTIONS.withMajorParam(channelId),
Map.of("message", messageId, "emojis", encodeUTF8(emoji))).emptyBody(true)));
}
@Test
public void shouldEmitCompleted() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Completable.complete()
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertComplete();
}
@Nonnull
public Completable deletePermissionOverride(@Nonnull final String channelId,
@Nonnull final String overwriteId, @Nullable final String reason) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.DELETE_CHANNEL_PERMISSION.withMajorParam(channelId),
Map.of("overwrite", overwriteId)).reason(reason).emptyBody(true)));
}
@Nonnull
public Completable editPermissionOverride(@Nonnull final String channelId, @Nonnull final String overwriteId,
@Nonnull final Collection<Permission> allowed,
@Nonnull final Collection<Permission> denied,
final boolean isMember, @Nullable final String reason) {
return Completable.fromObservable(catnip().requester()
.queue(new OutboundRequest(Routes.EDIT_CHANNEL_PERMISSIONS.withMajorParam(channelId),
Map.of("overwrite", overwriteId), JsonObject.builder()
.value("allow", Permission.from(allowed))
.value("deny", Permission.from(denied))
.value("type", isMember ? "member" : "role")
.done(),
reason
)));
}