下面列出了io.reactivex.rxjava3.annotations.NonNull#io.reactivex.rxjava3.core.Maybe 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testMaybe() throws InterruptedException {
DataService dataService = new DataServiceImpl(executor);
Maybe<DataItem> maybe = dataService.getMaybe(new SingleDataQuery("maybe-query"));
SynchronousMaybeObserver maybeObserver = new SynchronousMaybeObserver();
maybe.subscribe(maybeObserver);
maybeObserver.await(10, TimeUnit.SECONDS);
Assert.assertTrue(maybeObserver.isCompleted());
Assert.assertFalse(maybeObserver.hasErrors());
Assert.assertNotNull(maybeObserver.getDataItem());
Assert.assertEquals(maybeObserver.getDataItem().getRequest(), "maybe-query");
Assert.assertEquals(maybeObserver.getDataItem().getResult(), "maybe-data-result");
Assert.assertTrue(maybeObserver.getDataItem().getOrdinal() == 1);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Maybe.concatDelayError(Arrays.asList(cache,remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
})
.firstElement();
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type)
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(Record<T> record) throws Exception {
return System.currentTimeMillis() - record.getCreateTime() <= timestamp;
}
});
Maybe<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public Maybe<User> findById(Integer id) {
return Maybe.fromCallable(() -> {
User user = new User();
user.setId(id);
user.setNick(faker.name().name());
user.setPhone(faker.phoneNumber().cellPhone());
user.setEmail(faker.internet().emailAddress());
return user;
});
}
@Override
public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {
return source.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
return new Record<>(Source.CLOUD, key, t);
}
});
}
@Test
public void shouldSubscribeToMaybeJust() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
Maybe.just(1)
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertResult(1);
then(circuitBreaker).should().onSuccess(anyLong(), any(TimeUnit.class));
then(circuitBreaker).should(never())
.onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
@Test
public void shouldPropagateError() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
Maybe.error(new IOException("BAM!"))
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertError(IOException.class)
.assertNotComplete();
then(circuitBreaker).should()
.onError(anyLong(), any(TimeUnit.class), any(IOException.class));
then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
}
@Test
public void shouldEmitErrorWithCallNotPermittedException() {
given(circuitBreaker.tryAcquirePermission()).willReturn(false);
Maybe.just(1)
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertError(CallNotPermittedException.class)
.assertNotComplete();
then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
then(circuitBreaker).should(never())
.onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
}
@Test
public void shouldReleasePermissionOnCancel() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
Maybe.just(1)
.delay(1, TimeUnit.DAYS)
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.dispose();
then(circuitBreaker).should().releasePermission();
then(circuitBreaker).should(never())
.onError(anyLong(), any(TimeUnit.class), any(Throwable.class));
then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
}
@Test
public void shouldEmitAllEvents() {
given(bulkhead.tryAcquirePermission()).willReturn(true);
Maybe.just(1)
.compose(BulkheadOperator.of(bulkhead))
.test()
.assertResult(1);
verify(bulkhead).onComplete();
}
@Test
public void shouldPropagateError() {
given(bulkhead.tryAcquirePermission()).willReturn(true);
Maybe.error(new IOException("BAM!"))
.compose(BulkheadOperator.of(bulkhead))
.test()
.assertError(IOException.class)
.assertNotComplete();
verify(bulkhead).onComplete();
}
@Test
public void shouldEmitErrorWithBulkheadFullException() {
given(bulkhead.tryAcquirePermission()).willReturn(false);
Maybe.just(1)
.compose(BulkheadOperator.of(bulkhead))
.test()
.assertError(BulkheadFullException.class)
.assertNotComplete();
verify(bulkhead, never()).onComplete();
}
@Test
public void shouldReleaseBulkheadOnlyOnce() {
given(bulkhead.tryAcquirePermission()).willReturn(true);
Maybe.just(Arrays.asList(1, 2, 3))
.compose(BulkheadOperator.of(bulkhead))
.flatMapObservable(Observable::fromIterable)
.take(2) //this with the previous line triggers an extra dispose
.test()
.assertResult(1, 2);
verify(bulkhead).onComplete();
}
@Test
public void otherError() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Maybe.error(new RuntimeException())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertError(RuntimeException.class);
then(timeLimiter).should()
.onError(any(RuntimeException.class));
}
@Test
public void timeoutEmpty() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Maybe.empty()
.delay(1, TimeUnit.MINUTES)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void doNotTimeoutEmpty() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestObserver<?> observer = Maybe.empty()
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
observer.assertComplete();
then(timeLimiter).should()
.onSuccess();
}
@Test
public void timeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ZERO));
TestObserver<?> observer = Maybe.timer(1, TimeUnit.MINUTES)
.flatMapCompletable(t -> Completable.complete())
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
observer.assertError(TimeoutException.class);
then(timeLimiter).should()
.onError(any(TimeoutException.class));
}
@Test
public void shouldEmitEvent() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Maybe.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1);
}
@Test
public void shouldDelaySubscription() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(1).toNanos());
Maybe.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.awaitDone(2, TimeUnit.SECONDS);
}
@Test
public void shouldPropagateError() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Maybe.error(new IOException("BAM!"))
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertError(IOException.class)
.assertNotComplete();
}
@Test
public void shouldEmitErrorWithRequestNotPermittedException() {
given(rateLimiter.reservePermission()).willReturn(-1L);
Maybe.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertError(RequestNotPermitted.class)
.assertNotComplete();
}
/**
* Returns {@code true} if the specified {@link Class} can be handled by the
* {@link ObservableResponseConverterFunction}.
*/
private static boolean isSupportedClass(Class<?> clazz) {
return Observable.class.isAssignableFrom(clazz) ||
Maybe.class.isAssignableFrom(clazz) ||
Single.class.isAssignableFrom(clazz) ||
Completable.class.isAssignableFrom(clazz);
}
private static Maybe<String> maybe(String input) {
RequestContext.current();
return Maybe.create(emitter -> {
RequestContext.current();
emitter.onSuccess(input);
});
}
@GetMapping("/user/{id}")
public Maybe<User> user(@PathVariable Integer id) {
return rx3UserService.findById(id);
}
@Override
public Maybe<DataItem> getMaybe(SingleDataQuery dataQuery) {
return Maybe.create(new MaybeOnSubscribeDataProducer(executor, dataQuery));
}
@CacheKey("test")
@CacheMethod(methodType = MethodType.GET)
<T> Maybe<Record<T>> getMaybe(@CacheClass Class<T> clazz);
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CacheMethod cacheMethod = method.getAnnotation(CacheMethod.class);
CacheKey cacheKey = method.getAnnotation(CacheKey.class);
CacheLifecycle cacheLifecycle = method.getAnnotation(CacheLifecycle.class);
Annotation[][] allParamsAnnotations = method.getParameterAnnotations();
Class cacheClazz = null;
Object cacheValue = null;
if (allParamsAnnotations != null) {
for (int i = 0; i < allParamsAnnotations.length; i++) {
Annotation[] paramAnnotations = allParamsAnnotations[i];
if (paramAnnotations != null) {
for (Annotation annotation : paramAnnotations) {
if (annotation instanceof CacheClass) {
cacheClazz = (Class) args[i];
}
if (annotation instanceof CacheValue) {
cacheValue = args[i];
}
}
}
}
}
if (cacheMethod!=null) {
MethodType methodType = cacheMethod.methodType();
long duration = -1;
if (cacheLifecycle != null) {
duration = cacheLifecycle.duration();
}
if (methodType == MethodType.GET) {
Class returnClazz = method.getReturnType();
if (returnClazz!=null) {
if (Observable.class.isAssignableFrom(returnClazz)) {
return rxCache.load2Observable(cacheKey.value(),cacheClazz);
} else if (Flowable.class.isAssignableFrom(returnClazz)) {
return rxCache.load2Flowable(cacheKey.value(),cacheClazz);
} else if (Single.class.isAssignableFrom(returnClazz)) {
return rxCache.load2Single(cacheKey.value(),cacheClazz);
} else if (Maybe.class.isAssignableFrom(returnClazz)) {
return rxCache.load2Maybe(cacheKey.value(),cacheClazz);
} else if (Record.class.isAssignableFrom(returnClazz)) {
return rxCache.get(cacheKey.value(),cacheClazz);
}
}
} else if (methodType == MethodType.SAVE) {
rxCache.save(cacheKey.value(),cacheValue,duration);
} else if (methodType == MethodType.REMOVE) {
rxCache.remove(cacheKey.value());
}
}
return null;
}