类io.reactivex.rxjava3.core.Single源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.Single的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void twoEvents() {
    TestObserver<String> testObserver = Single.just("1")
        .delay(1, TimeUnit.MILLISECONDS, testScheduler)
        .compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
        .test();

    lifecycle.onNext("keep going");
    testObserver.assertNoErrors();

    lifecycle.onNext("stop");
    testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    testObserver.assertNoValues();
    testObserver.assertError(CancellationException.class);
}
 
源代码2 项目: resilience4j   文件: SingleCircuitBreakerTest.java
@Test
public void shouldSubscribeToMonoFromCallableMultipleTimes() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(true);
    given(helloWorldService.returnHelloWorld()).willReturn("Hello World");

    Single.fromCallable(() -> helloWorldService.returnHelloWorld())
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .repeat(2)
        .test()
        .assertResult("Hello World", "Hello World");

    then(helloWorldService).should(times(2)).returnHelloWorld();
    then(circuitBreaker).should(times(2)).onSuccess(anyLong(), any(TimeUnit.class));
    then(circuitBreaker).should(never())
        .onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
 
源代码3 项目: catnip   文件: CatnipImpl.java
@Nonnull
@Override
public Single<GatewayInfo> fetchGatewayInfo() {
    return rest.user().getGatewayBot()
            .map(g -> {
                if(g.valid()) {
                    gatewayInfo.set(g);
                    return g;
                } else {
                    throw new RuntimeException("Gateway info not valid! Is your token valid?");
                }
            })
            .doOnError(e -> {
                throw new RuntimeException(e);
            });
}
 
@Test
public void oneEvent() {
    TestObserver<String> testObserver = Single.just("1")
        .delay(1, TimeUnit.MILLISECONDS, testScheduler)
        .compose(RxLifecycle.<String, String>bind(lifecycle))
        .test();

    testObserver.assertNoValues();
    testObserver.assertNoErrors();

    lifecycle.onNext("stop");
    testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    testObserver.assertNoValues();
    testObserver.assertError(CancellationException.class);
}
 
源代码5 项目: code-examples   文件: ReactiveBatchProcessorV2.java
public void start() {
  // WARNING: this code doesn't work as expected
  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码6 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    return asyncResponse;
}
 
源代码7 项目: catnip   文件: GuildChannel.java
@Nonnull
public Single<GuildChannel> submit() {
    if(channel == null) {
        throw new IllegalStateException("Cannot submit edit without a channel object! Please use RestChannel directly instead");
    }
    return channel.catnip().rest().channel().modifyChannel(channel.id(), this, null);
}
 
源代码8 项目: objectbox-java   文件: RxQuery.java
/**
 * The returned Single emits one Query result as a List.
 */
public static <T> Single<List<T>> single(final Query<T> query) {
    return Single.create(emitter -> {
        query.subscribe().single().observer(data -> {
            if (!emitter.isDisposed()) {
                emitter.onSuccess(data);
            }
        });
        // no need to cancel, single never subscribes
    });
}
 
源代码9 项目: catnip   文件: MessageChannel.java
/**
 * Send a message to this channel with the specified content.
 *
 * @param content The text content to send.
 *
 * @return A Observable that completes when the message is sent.
 */
@Nonnull
default Single<Message> sendMessage(@Nonnull final String content) {
    if(isGuild()) {
        PermissionUtil.checkPermissions(catnip(), asGuildChannel().guildId(), id(),
                Permission.SEND_MESSAGES);
    }
    final Single<Message> future = catnip().rest().channel().sendMessage(id(), content);
    // Inject guild manually because Discord does not send it in response
    if(isGuild()) {
        return future.map(msg -> ((MessageImpl) msg).guildIdAsLong(asGuildChannel().guildIdAsLong()));
    }
    return future;
}
 
源代码10 项目: resilience4j   文件: SingleRateLimiterTest.java
@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
    given(rateLimiter.reservePermission()).willReturn(-1L);

    Single.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertError(RequestNotPermitted.class)
        .assertNotComplete();
}
 
源代码11 项目: catnip   文件: MemoryEntityCache.java
protected <I, T> Single<T> or(final I id, final T data) {
    if(data == null) {
        return Single.error(new IllegalArgumentException("No entity for: " + id));
    } else {
        return Single.just(data);
    }
}
 
@Test
public void otherError() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestObserver<?> observer = Single.error(new RuntimeException())
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    observer.assertError(RuntimeException.class);
    then(timeLimiter).should()
        .onError(any(RuntimeException.class));
}
 
源代码13 项目: catnip   文件: GuildChannel.java
@Nonnull
public Single<GuildChannel> submit(@Nullable final String reason) {
    if(channel == null) {
        throw new IllegalStateException("Cannot submit edit without a channel object! Please use RestChannel directly instead");
    }
    return channel.catnip().rest().channel().modifyChannel(channel.id(), this, reason);
}
 
@Test
public void twoOpenEvents() {
    TestObserver<String> testObserver = Single.just("1")
        .delay(1, TimeUnit.MILLISECONDS, testScheduler)
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    testObserver.assertNoValues();

    lifecycle.onNext("create");
    lifecycle.onNext("start");
    testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    testObserver.assertValue("1");
    testObserver.assertComplete();
}
 
源代码15 项目: code-examples   文件: ReactiveBatchProcessorV3.java
public void start() {
  // WARNING: this code doesn't work as expected
  Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(scheduler))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码16 项目: catnip   文件: RestChannel.java
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
                                   @Nonnull final MessageOptions message) {
    return editMessage(channelId, messageId, message, Set.of());
}
 
源代码17 项目: catnip   文件: RestUser.java
@Nonnull
@CheckReturnValue
public Single<ApplicationInfo> getCurrentApplicationInformation() {
    return Single.fromObservable(getCurrentApplicationInformationRaw().map(entityBuilder()::createApplicationInfo));
}
 
源代码18 项目: catnip   文件: RestChannel.java
@Nonnull
@CheckReturnValue
public Single<Channel> deleteChannel(@Nonnull final String channelId, @Nullable final String reason) {
    return Single.fromObservable(deleteChannelRaw(channelId, reason).map(entityBuilder()::createChannel));
}
 
源代码19 项目: RxBasicsKata   文件: CountriesServiceSolved.java
@Override
public Single<Boolean> areEmittingSameSequences(Observable<Country> countryObservable1,
                                                Observable<Country> countryObservable2) {
    return null; // put your solution here
}
 
源代码20 项目: armeria   文件: MainService.java
@Override
public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
    final Scheduler contextAwareScheduler = Schedulers.from(ctx.contextAwareExecutor());

    // This logic mimics using a blocking method, which would usually be something like a MySQL
    // database query using JDBC.
    final Flowable<Long> fetchNumsFromFakeDb =
            Single.fromCallable(
                    () -> {
                        // The context is mounted in a thread-local, meaning it is available to all
                        // logic such as tracing.
                        checkState(ServiceRequestContext.current() == ctx);
                        checkState(!ctx.eventLoop().inEventLoop());

                        Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
                        return ImmutableList.of(23L, -23L);
                    })
                  // Always run blocking logic on the blocking task executor. By using
                  // ServiceRequestContext.blockingTaskExecutor, you also ensure the context is mounted
                  // inside the logic (e.g., your DB call will be traced!).
                  .subscribeOn(Schedulers.from(ctx.blockingTaskExecutor()))
                  .flattenAsFlowable(l -> l);

    final Flowable<Long> extractNumsFromRequest =
            Single.fromCompletionStage(req.aggregate())
                           // Unless you know what you're doing, always use subscribeOn with the context
                           // executor to have the context mounted and stay on a single thread to reduce
                           // concurrency issues.
                           .subscribeOn(contextAwareScheduler)
                           .flatMapPublisher(request -> {
                               // The context is mounted in a thread-local, meaning it is available to all
                               // logic such as tracing.
                               checkState(ServiceRequestContext.current() == ctx);
                               checkState(ctx.eventLoop().inEventLoop());

                               final List<Long> nums = new ArrayList<>();
                               for (String token : Iterables.concat(
                                       NUM_SPLITTER.split(request.path().substring(1)),
                                       NUM_SPLITTER.split(request.contentUtf8()))) {
                                   nums.add(Long.parseLong(token));
                               }

                               return Flowable.fromIterable(nums);
                           });

    final Single<HttpResponse> response =
            Flowable.concatArrayEager(extractNumsFromRequest, fetchNumsFromFakeDb)
                    // Unless you know what you're doing, always use subscribeOn with the context executor
                    // to have the context mounted and stay on a single thread to reduce concurrency issues.
                    .subscribeOn(contextAwareScheduler)
                    // When concatenating flowables, you should almost always call observeOn with the
                    // context executor because we don't know here whether the subscription is on it or
                    // something like a blocking task executor.
                    .observeOn(contextAwareScheduler)
                    .flatMapSingle(num -> {
                        // The context is mounted in a thread-local, meaning it is available to all logic
                        // such as tracing.
                        checkState(ServiceRequestContext.current() == ctx);
                        checkState(ctx.eventLoop().inEventLoop());

                        return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
                    })
                    .map(AggregatedHttpResponse::contentUtf8)
                    .collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
                    .map(content -> HttpResponse.of(content.toString()))
                    .onErrorReturn(HttpResponse::ofFailure);

    return HttpResponse.from(response.toCompletionStage());
}
 
源代码21 项目: catnip   文件: RestChannel.java
@Nonnull
public Single<GuildChannel> modifyChannel(@Nonnull final String channelId,
                                          @Nonnull final ChannelEditFields fields,
                                          @Nullable final String reason) {
    return Single.fromObservable(modifyChannelRaw(channelId, fields, reason).map(entityBuilder()::createGuildChannel));
}
 
源代码22 项目: catnip   文件: RestGuild.java
@Nonnull
@CheckReturnValue
public Single<Guild> getGuild(@Nonnull final String guildId) {
    return Single.fromObservable(getGuildRaw(guildId).map(entityBuilder()::createGuild));
}
 
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.UnlockWalletResponse> unlockWallet(com.github.lightningnetwork.lnd.lnrpc.UnlockWalletRequest request) {
    return DefaultSingle.createDefault(emitter -> asyncStub.unlockWallet(request, new RemoteLndSingleObserver<>(emitter)));
}
 
源代码24 项目: catnip   文件: MemoryEntityCache.java
@Nonnull
@Override
public Single<User> selfUserAsync() {
    return or("self user", selfUser());
}
 
源代码25 项目: catnip   文件: RestChannel.java
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
                                   @Nonnull final String content) {
    return editMessage(channelId, messageId, new MessageOptions().content(content).buildMessage());
}
 
源代码26 项目: catnip   文件: RestChannel.java
@Nonnull
public Single<Message> editMessage(@Nonnull final String channelId, @Nonnull final String messageId,
                                   @Nonnull final Message message) {
    return editMessage(channelId, messageId, message, Set.of());
}
 
源代码27 项目: zap-android   文件: RemoteLndLightningService.java
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.WalletBalanceResponse> walletBalance(com.github.lightningnetwork.lnd.lnrpc.WalletBalanceRequest request) {
    return DefaultSingle.createDefault(emitter -> asyncStub.walletBalance(request, new RemoteLndSingleObserver<>(emitter)));
}
 
源代码28 项目: catnip   文件: MemoryEntityCache.java
@Nonnull
@Override
public Single<UserDMChannel> dmChannelAsync(final long id) {
    return or(id, dmChannel(id));
}
 
源代码29 项目: catnip   文件: RestChannel.java
@Nonnull
@CheckReturnValue
public Single<Channel> deleteChannel(@Nonnull final String channelId) {
    return deleteChannel(channelId, null);
}
 
源代码30 项目: zap-android   文件: RemoteLndLightningService.java
@Override
public Single<com.github.lightningnetwork.lnd.lnrpc.ListUnspentResponse> listUnspent(com.github.lightningnetwork.lnd.lnrpc.ListUnspentRequest request) {
    return DefaultSingle.createDefault(emitter -> asyncStub.listUnspent(request, new RemoteLndSingleObserver<>(emitter)));
}
 
 类所在包
 同包方法