类io.reactivex.rxjava3.core.Maybe源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.Maybe的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: java-11-examples   文件: DataServiceTest.java
@Test
public void testMaybe() throws InterruptedException {
    DataService dataService = new DataServiceImpl(executor);
    Maybe<DataItem> maybe = dataService.getMaybe(new SingleDataQuery("maybe-query"));
    SynchronousMaybeObserver maybeObserver = new SynchronousMaybeObserver();

    maybe.subscribe(maybeObserver);
    maybeObserver.await(10, TimeUnit.SECONDS);

    Assert.assertTrue(maybeObserver.isCompleted());
    Assert.assertFalse(maybeObserver.hasErrors());
    Assert.assertNotNull(maybeObserver.getDataItem());
    Assert.assertEquals(maybeObserver.getDataItem().getRequest(), "maybe-query");
    Assert.assertEquals(maybeObserver.getDataItem().getResult(), "maybe-data-result");
    Assert.assertTrue(maybeObserver.getDataItem().getOrdinal() == 1);
}
 
源代码2 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Maybe.concatDelayError(Arrays.asList(cache,remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            })
            .firstElement();
}
 
源代码3 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码4 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码5 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码6 项目: RxCache   文件: CacheFirstTimeoutStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(Record<T> record) throws Exception {
                    return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
                }
            });

    Maybe<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
@Override
public Maybe<User> findById(Integer id) {
    return Maybe.fromCallable(() -> {
        User user = new User();
        user.setId(id);
        user.setNick(faker.name().name());
        user.setPhone(faker.phoneNumber().cellPhone());
        user.setEmail(faker.internet().emailAddress());
        return user;
    });
}
 
源代码8 项目: RxCache   文件: NoCacheStrategy.java
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

    return source.map(new Function<T, Record<T>>() {
        @Override
        public Record<T> apply(@NonNull T t) throws Exception {
            return new Record<>(Source.CLOUD, key, t);
        }
    });
}
 
源代码9 项目: resilience4j   文件: MaybeCircuitBreakerTest.java
@Test
public void shouldSubscribeToMaybeJust() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(true);

    Maybe.just(1)
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .test()
        .assertResult(1);

    then(circuitBreaker).should().onSuccess(anyLong(), any(TimeUnit.class));
    then(circuitBreaker).should(never())
        .onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
 
源代码10 项目: resilience4j   文件: MaybeCircuitBreakerTest.java
@Test
public void shouldPropagateError() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(true);

    Maybe.error(new IOException("BAM!"))
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .test()
        .assertError(IOException.class)
        .assertNotComplete();

    then(circuitBreaker).should()
        .onError(anyLong(), any(TimeUnit.class), any(IOException.class));
    then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
}
 
源代码11 项目: resilience4j   文件: MaybeCircuitBreakerTest.java
@Test
public void shouldEmitErrorWithCallNotPermittedException() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(false);

    Maybe.just(1)
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .test()
        .assertError(CallNotPermittedException.class)
        .assertNotComplete();

    then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
    then(circuitBreaker).should(never())
        .onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
 
源代码12 项目: resilience4j   文件: MaybeCircuitBreakerTest.java
@Test
public void shouldReleasePermissionOnCancel() {
    given(circuitBreaker.tryAcquirePermission()).willReturn(true);

    Maybe.just(1)
        .delay(1, TimeUnit.DAYS)
        .compose(CircuitBreakerOperator.of(circuitBreaker))
        .test()
        .dispose();

    then(circuitBreaker).should().releasePermission();
    then(circuitBreaker).should(never())
        .onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
    then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
}
 
源代码13 项目: resilience4j   文件: MaybeBulkheadTest.java
@Test
public void shouldEmitAllEvents() {
    given(bulkhead.tryAcquirePermission()).willReturn(true);

    Maybe.just(1)
        .compose(BulkheadOperator.of(bulkhead))
        .test()
        .assertResult(1);

    verify(bulkhead).onComplete();
}
 
源代码14 项目: resilience4j   文件: MaybeBulkheadTest.java
@Test
public void shouldPropagateError() {
    given(bulkhead.tryAcquirePermission()).willReturn(true);

    Maybe.error(new IOException("BAM!"))
        .compose(BulkheadOperator.of(bulkhead))
        .test()
        .assertError(IOException.class)
        .assertNotComplete();

    verify(bulkhead).onComplete();
}
 
源代码15 项目: resilience4j   文件: MaybeBulkheadTest.java
@Test
public void shouldEmitErrorWithBulkheadFullException() {
    given(bulkhead.tryAcquirePermission()).willReturn(false);

    Maybe.just(1)
        .compose(BulkheadOperator.of(bulkhead))
        .test()
        .assertError(BulkheadFullException.class)
        .assertNotComplete();

    verify(bulkhead, never()).onComplete();
}
 
源代码16 项目: resilience4j   文件: MaybeBulkheadTest.java
@Test
public void shouldReleaseBulkheadOnlyOnce() {
    given(bulkhead.tryAcquirePermission()).willReturn(true);

    Maybe.just(Arrays.asList(1, 2, 3))
        .compose(BulkheadOperator.of(bulkhead))
        .flatMapObservable(Observable::fromIterable)
        .take(2) //this with the previous line triggers an extra dispose
        .test()
        .assertResult(1, 2);

    verify(bulkhead).onComplete();
}
 
@Test
public void otherError() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestObserver<?> observer = Maybe.error(new RuntimeException())
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    observer.assertError(RuntimeException.class);
    then(timeLimiter).should()
        .onError(any(RuntimeException.class));
}
 
@Test
public void timeoutEmpty() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestObserver<?> observer = Maybe.empty()
        .delay(1, TimeUnit.MINUTES)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    observer.assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
@Test
public void doNotTimeoutEmpty() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));
    TestObserver<?> observer = Maybe.empty()
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    observer.assertComplete();
    then(timeLimiter).should()
        .onSuccess();
}
 
@Test
public void timeout() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestObserver<?> observer = Maybe.timer(1, TimeUnit.MINUTES)
        .flatMapCompletable(t -> Completable.complete())
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    observer.assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
源代码21 项目: resilience4j   文件: MaybeRateLimiterTest.java
@Test
public void shouldEmitEvent() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Maybe.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertResult(1);
}
 
源代码22 项目: resilience4j   文件: MaybeRateLimiterTest.java
@Test
public void shouldDelaySubscription() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(1).toNanos());

    Maybe.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .awaitDone(2, TimeUnit.SECONDS);
}
 
源代码23 项目: resilience4j   文件: MaybeRateLimiterTest.java
@Test
public void shouldPropagateError() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Maybe.error(new IOException("BAM!"))
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertError(IOException.class)
        .assertNotComplete();
}
 
源代码24 项目: resilience4j   文件: MaybeRateLimiterTest.java
@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
    given(rateLimiter.reservePermission()).willReturn(-1L);

    Maybe.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertError(RequestNotPermitted.class)
        .assertNotComplete();
}
 
/**
 * Returns {@code true} if the specified {@link Class} can be handled by the
 * {@link ObservableResponseConverterFunction}.
 */
private static boolean isSupportedClass(Class<?> clazz) {
    return Observable.class.isAssignableFrom(clazz) ||
           Maybe.class.isAssignableFrom(clazz) ||
           Single.class.isAssignableFrom(clazz) ||
           Completable.class.isAssignableFrom(clazz);
}
 
源代码26 项目: armeria   文件: RequestContextAssemblyTest.java
private static Maybe<String> maybe(String input) {
    RequestContext.current();
    return Maybe.create(emitter -> {
        RequestContext.current();
        emitter.onSuccess(input);
    });
}
 
@GetMapping("/user/{id}")
public Maybe<User> user(@PathVariable Integer id) {
    return rx3UserService.findById(id);
}
 
源代码28 项目: java-11-examples   文件: DataServiceImpl.java
@Override
public Maybe<DataItem> getMaybe(SingleDataQuery dataQuery) {
    return Maybe.create(new MaybeOnSubscribeDataProducer(executor, dataQuery));
}
 
源代码29 项目: RxCache   文件: Provider.java
@CacheKey("test")
@CacheMethod(methodType = MethodType.GET)
<T> Maybe<Record<T>> getMaybe(@CacheClass Class<T> clazz);
 
源代码30 项目: RxCache   文件: CacheProxy.java
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    CacheMethod cacheMethod = method.getAnnotation(CacheMethod.class);
    CacheKey cacheKey = method.getAnnotation(CacheKey.class);
    CacheLifecycle cacheLifecycle = method.getAnnotation(CacheLifecycle.class);

    Annotation[][] allParamsAnnotations = method.getParameterAnnotations();

    Class cacheClazz = null;
    Object cacheValue = null;

    if (allParamsAnnotations != null) {
        for (int i = 0; i < allParamsAnnotations.length; i++) {
            Annotation[] paramAnnotations = allParamsAnnotations[i];
            if (paramAnnotations != null) {
                for (Annotation annotation : paramAnnotations) {
                    if (annotation instanceof CacheClass) {
                        cacheClazz = (Class) args[i];
                    }

                    if (annotation instanceof CacheValue) {
                        cacheValue = args[i];
                    }
                }
            }
        }
    }

    if (cacheMethod!=null) {

        MethodType methodType = cacheMethod.methodType();

        long duration = -1;

        if (cacheLifecycle != null) {
            duration = cacheLifecycle.duration();
        }

        if (methodType == MethodType.GET) {

            Class returnClazz = method.getReturnType();

            if (returnClazz!=null) {

                if (Observable.class.isAssignableFrom(returnClazz)) {

                    return rxCache.load2Observable(cacheKey.value(),cacheClazz);
                } else if (Flowable.class.isAssignableFrom(returnClazz)) {

                    return rxCache.load2Flowable(cacheKey.value(),cacheClazz);
                } else if (Single.class.isAssignableFrom(returnClazz)) {

                    return rxCache.load2Single(cacheKey.value(),cacheClazz);
                } else if (Maybe.class.isAssignableFrom(returnClazz)) {

                    return rxCache.load2Maybe(cacheKey.value(),cacheClazz);
                } else if (Record.class.isAssignableFrom(returnClazz)) {

                    return rxCache.get(cacheKey.value(),cacheClazz);
                }
            }

        } else if (methodType == MethodType.SAVE) {

            rxCache.save(cacheKey.value(),cacheValue,duration);

        } else if (methodType == MethodType.REMOVE) {

            rxCache.remove(cacheKey.value());
        }
    }

    return null;
}
 
 类所在包
 类方法
 同包方法