下面列出了怎么用io.reactivex.rxjava3.core.Single的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void twoEvents() {
TestObserver<String> testObserver = Single.just("1")
.delay(1, TimeUnit.MILLISECONDS, testScheduler)
.compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
.test();
lifecycle.onNext("keep going");
testObserver.assertNoErrors();
lifecycle.onNext("stop");
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
testObserver.assertError(CancellationException.class);
}
@Test
public void shouldSubscribeToMonoFromCallableMultipleTimes() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
given(helloWorldService.returnHelloWorld()).willReturn("Hello World");
Single.fromCallable(() -> helloWorldService.returnHelloWorld())
.compose(CircuitBreakerOperator.of(circuitBreaker))
.repeat(2)
.test()
.assertResult("Hello World", "Hello World");
then(helloWorldService).should(times(2)).returnHelloWorld();
then(circuitBreaker).should(times(2)).onSuccess(anyLong(), any(TimeUnit.class));
then(circuitBreaker).should(never())
.onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
@Nonnull
@Override
public Single<GatewayInfo> fetchGatewayInfo() {
return rest.user().getGatewayBot()
.map(g -> {
if(g.valid()) {
gatewayInfo.set(g);
return g;
} else {
throw new RuntimeException("Gateway info not valid! Is your token valid?");
}
})
.doOnError(e -> {
throw new RuntimeException(e);
});
}
@Test
public void oneEvent() {
TestObserver<String> testObserver = Single.just("1")
.delay(1, TimeUnit.MILLISECONDS, testScheduler)
.compose(RxLifecycle.<String, String>bind(lifecycle))
.test();
testObserver.assertNoValues();
testObserver.assertNoErrors();
lifecycle.onNext("stop");
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
testObserver.assertError(CancellationException.class);
}
public void start() {
// WARNING: this code doesn't work as expected
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
@Nonnull
public Single<GuildChannel> submit() {
if(channel == null) {
throw new IllegalStateException("Cannot submit edit without a channel object! Please use RestChannel directly instead");
}
return channel.catnip().rest().channel().modifyChannel(channel.id(), this, null);
}
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(emitter -> {
query.subscribe().single().observer(data -> {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
});
// no need to cancel, single never subscribes
});
}
/**
* Send a message to this channel with the specified content.
*
* @param content The text content to send.
*
* @return A Observable that completes when the message is sent.
*/
@Nonnull
default Single<Message> sendMessage(@Nonnull final String content) {
if(isGuild()) {
PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
Permission.SEND_MESSAGES);
}
final Single<Message> future = catnip().rest().channel().sendMessage(id(), content);
// Inject guild manually because Discord does not send it in response
if(isGuild()) {
return future.map(msg -> ((MessageImpl) msg).guildIdAsLong(asGuildChannel().guildIdAsLong()));
}
return future;
}
@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
given(rateLimiter.reservePermission()).willReturn(-1L);
Single.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertError(RequestNotPermitted.class)
.assertNotComplete();
}
protected <I, T> Single<T> or(final I id, final T data) {
if(data == null) {
return Single.error(new IllegalArgumentException("No entity for: " + id));
} else {
return Single.just(data);
}
}
@Test
public void otherError() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Single.error(new RuntimeException())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertError(RuntimeException.class);
then(timeLimiter).should()
.onError(any(RuntimeException.class));
}
@Nonnull
public Single<GuildChannel> submit(@Nullable final String reason) {
if(channel == null) {
throw new IllegalStateException("Cannot submit edit without a channel object! Please use RestChannel directly instead");
}
return channel.catnip().rest().channel().modifyChannel(channel.id(), this, reason);
}
@Test
public void twoOpenEvents() {
TestObserver<String> testObserver = Single.just("1")
.delay(1, TimeUnit.MILLISECONDS, testScheduler)
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
testObserver.assertNoValues();
lifecycle.onNext("create");
lifecycle.onNext("start");
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertValue("1");
testObserver.assertComplete();
}
public void start() {
// WARNING: this code doesn't work as expected
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(scheduler))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final MessageOptions message) {
return editMessage(channelId, messageId, message, Set.of());
}
@Nonnull
@CheckReturnValue
public Single<ApplicationInfo> getCurrentApplicationInformation() {
return Single.fromObservable(getCurrentApplicationInformationRaw().map(entityBuilder()::createApplicationInfo));
}
@Nonnull
@CheckReturnValue
public Single<Channel> deleteChannel(@Nonnull final String channelId, @Nullable final String reason) {
return Single.fromObservable(deleteChannelRaw(channelId, reason).map(entityBuilder()::createChannel));
}
@Override
public Single<Boolean> areEmittingSameSequences(Observable<Country> countryObservable1,
Observable<Country> countryObservable2) {
return null; // put your solution here
}
@Override
public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
final Scheduler contextAwareScheduler = Schedulers.from(ctx.contextAwareExecutor());
// This logic mimics using a blocking method, which would usually be something like a MySQL
// database query using JDBC.
final Flowable<Long> fetchNumsFromFakeDb =
Single.fromCallable(
() -> {
// The context is mounted in a thread-local, meaning it is available to all
// logic such as tracing.
checkState(ServiceRequestContext.current() == ctx);
checkState(!ctx.eventLoop().inEventLoop());
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
return ImmutableList.of(23L, -23L);
})
// Always run blocking logic on the blocking task executor. By using
// ServiceRequestContext.blockingTaskExecutor, you also ensure the context is mounted
// inside the logic (e.g., your DB call will be traced!).
.subscribeOn(Schedulers.from(ctx.blockingTaskExecutor()))
.flattenAsFlowable(l -> l);
final Flowable<Long> extractNumsFromRequest =
Single.fromCompletionStage(req.aggregate())
// Unless you know what you're doing, always use subscribeOn with the context
// executor to have the context mounted and stay on a single thread to reduce
// concurrency issues.
.subscribeOn(contextAwareScheduler)
.flatMapPublisher(request -> {
// The context is mounted in a thread-local, meaning it is available to all
// logic such as tracing.
checkState(ServiceRequestContext.current() == ctx);
checkState(ctx.eventLoop().inEventLoop());
final List<Long> nums = new ArrayList<>();
for (String token : Iterables.concat(
NUM_SPLITTER.split(request.path().substring(1)),
NUM_SPLITTER.split(request.contentUtf8()))) {
nums.add(Long.parseLong(token));
}
return Flowable.fromIterable(nums);
});
final Single<HttpResponse> response =
Flowable.concatArrayEager(extractNumsFromRequest, fetchNumsFromFakeDb)
// Unless you know what you're doing, always use subscribeOn with the context executor
// to have the context mounted and stay on a single thread to reduce concurrency issues.
.subscribeOn(contextAwareScheduler)
// When concatenating flowables, you should almost always call observeOn with the
// context executor because we don't know here whether the subscription is on it or
// something like a blocking task executor.
.observeOn(contextAwareScheduler)
.flatMapSingle(num -> {
// The context is mounted in a thread-local, meaning it is available to all logic
// such as tracing.
checkState(ServiceRequestContext.current() == ctx);
checkState(ctx.eventLoop().inEventLoop());
return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
})
.map(AggregatedHttpResponse::contentUtf8)
.collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
.map(content -> HttpResponse.of(content.toString()))
.onErrorReturn(HttpResponse::ofFailure);
return HttpResponse.from(response.toCompletionStage());
}
@Nonnull
public Single<GuildChannel> modifyChannel(@Nonnull final String channelId,
@Nonnull final ChannelEditFields fields,
@Nullable final String reason) {
return Single.fromObservable(modifyChannelRaw(channelId, fields, reason).map(entityBuilder()::createGuildChannel));
}
@Nonnull
@CheckReturnValue
public Single<Guild> getGuild(@Nonnull final String guildId) {
return Single.fromObservable(getGuildRaw(guildId).map(entityBuilder()::createGuild));
}
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.UnlockWalletResponse> unlockWallet(com.github.lightningnetwork.lnd.lnrpc.UnlockWalletRequest request) {
return DefaultSingle.createDefault(emitter -> asyncStub.unlockWallet(request, new RemoteLndSingleObserver<>(emitter)));
}
@Nonnull
@Override
public Single<User> selfUserAsync() {
return or("self user", selfUser());
}
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final String content) {
return editMessage(channelId, messageId, new MessageOptions().content(content).buildMessage());
}
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
@Nonnull final Message message) {
return editMessage(channelId, messageId, message, Set.of());
}
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.WalletBalanceResponse> walletBalance(com.github.lightningnetwork.lnd.lnrpc.WalletBalanceRequest request) {
return DefaultSingle.createDefault(emitter -> asyncStub.walletBalance(request, new RemoteLndSingleObserver<>(emitter)));
}
@Nonnull
@Override
public Single<UserDMChannel> dmChannelAsync(final long id) {
return or(id, dmChannel(id));
}
@Nonnull
@CheckReturnValue
public Single<Channel> deleteChannel(@Nonnull final String channelId) {
return deleteChannel(channelId, null);
}
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.ListUnspentResponse> listUnspent(com.github.lightningnetwork.lnd.lnrpc.ListUnspentRequest request) {
return DefaultSingle.createDefault(emitter -> asyncStub.listUnspent(request, new RemoteLndSingleObserver<>(emitter)));
}