下面列出了io.reactivex.Maybe#fromCallable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void setup() throws IOException {
initMocks(this);
impressionStorageClient = new ImpressionStorageClient(storageClient);
fakeRead = Maybe.fromCallable(() -> campaignImpressionList);
wasWritten = false;
fakeWrite =
Completable.fromCallable(
() -> {
wasWritten = true;
return null;
});
when(storageClient.read(any(CampaignImpressionsParser.class))).thenReturn(fakeRead);
when(storageClient.write(any(CampaignImpressionList.class))).thenReturn(fakeWrite);
}
@Before
public void setup() throws IOException {
initMocks(this);
limitsMap = new HashMap<>();
limitsMap.put(LIMITER_KEY, counter);
storedRateLimit = RateLimitProto.RateLimit.newBuilder().putAllLimits(limitsMap).build();
fakeRead = Maybe.fromCallable(() -> storedRateLimit);
fakeWrite =
Completable.fromCallable(
() -> {
wasWritten = true;
return null;
});
rateLimiterClient = new RateLimiterClient(storageClient, new FakeClock(NOW));
when(storageClient.read(any(RateLimitParser.class))).thenReturn(fakeRead);
when(storageClient.write(any(RateLimitProto.RateLimit.class))).thenReturn(fakeWrite);
}
@Override
public Maybe<User> findById(Integer id) {
return Maybe.fromCallable(() -> {
User user = new User();
user.setId(id);
user.setNick(faker.name().name());
user.setPhone(faker.phoneNumber().cellPhone());
user.setEmail(faker.internet().emailAddress());
return user;
});
}
@Override
public Maybe<User> findByEmailOrPhone(String email, String phone) {
return Maybe.fromCallable(() -> {
User user = new User();
user.setId(1);
user.setEmail(email);
user.setPhone(phone);
user.setNick("nick1");
return user;
});
}
@Before
public void setup() throws IOException {
initMocks(this);
wasWritten = false;
campaignCacheClient =
new CampaignCacheClient(
storageClient,
(Application) RuntimeEnvironment.application.getApplicationContext(),
new FakeClock(NOW));
storageWriteObserver = TestObserver.create();
fakeRead = Maybe.fromCallable(() -> fetchEligibleCampaignsResponse1);
fakeWrite = Completable.fromCallable(() -> wasWritten = true);
}
public final Maybe<Cursor> queryStream(final Queryable queryable) {
return Maybe.fromCallable(new Callable<Cursor>() {
@Override
public Cursor call() throws Exception {
return query(queryable);
}
});
}
/**
* Fetches a single row from a database and maps it to and object of type {@link T},
* non-blocking operation.
*
* @return a {@link Single<T>} where an object of type {@link T} mapped from a database
* record is passed as the parameter to
* {@link io.reactivex.observers.DisposableMaybeObserver#onSuccess(Object)}
*/
@Override
public Maybe<T> asMaybe() {
return Maybe.fromCallable(new Callable<T>() {
@Override
public T call() {
return executeBlocking();
}
});
}
/**
* Fetches multiple rows from a database and maps them to objects of type {@link T},
* non-blocking operation.
*
* @return an {@link Maybe<T>} where a list of objects of type {@link T} mapped from a database
* record is passed as the parameter to
* {@link io.reactivex.observers.DisposableMaybeObserver#onSuccess(Object)}
*/
@Override
public Maybe<List<T>> asMaybe() {
return Maybe.fromCallable(new Callable<List<T>>() {
@Override
public List<T> call() {
return executeBlocking();
}
});
}
/**
* Saves one or more records in a table, non-blocking operation.
*
* @return a {@link Maybe<Integer>} where the number of records saved is passed
* as the parameter to {@link io.reactivex.observers.DisposableMaybeObserver#onSuccess(Object)}
*/
@Override
public Maybe<Integer> asMaybe() {
return Maybe.fromCallable(new Callable<Integer>() {
@Override
public Integer call() {
return executeBlocking();
}
});
}
/**
* Deletes one or more records from a table, non-blocking operation.
*
* @return a {@link Maybe<Integer>} where the number of records deleted is passed
* as the parameter to {@link io.reactivex.observers.DisposableMaybeObserver#onSuccess(Object)}
*/
@Override
public Maybe<Integer> asMaybe() {
return Maybe.fromCallable(new Callable<Integer>() {
@Override
public Integer call() {
return executeBlocking();
}
});
}
/**
* Converts the result stream to a {@link io.reactivex.Maybe} value, return the first element
* if present or completes if no results.
*
* @return maybe instance of the results of this query.
*/
@CheckReturnValue
public Maybe<E> maybe() {
return Maybe.fromCallable(new Callable<E>() {
@Override
public E call() throws Exception {
return firstOrNull();
}
});
}
@Override
public <E extends T, K> Maybe<E> findByKey(final Class<E> type, final K key) {
return Maybe.fromCallable(new Callable<E>() {
@Override
public E call() throws Exception {
return delegate.findByKey(type, key);
}
});
}
/**
* Gets the last cached campaign response
*
* <p>Returns {@link Maybe#empty()} if any of the following are true
*
* <ul>
* <li>If the storage client returns {@link Maybe#empty()}.
* <li>If the ttl on the cached proto is set and has expired.
* <li>If the ttl on the cached proto is not set and the proto file is older than 1 {@link
* TimeUnit#DAYS}.
* </ul>
*
* @return
*/
public Maybe<FetchEligibleCampaignsResponse> get() {
Maybe<FetchEligibleCampaignsResponse> readFromCache = Maybe.fromCallable(() -> cachedResponse);
Maybe<FetchEligibleCampaignsResponse> readFromStorage =
storageClient
.read(FetchEligibleCampaignsResponse.parser())
.doOnSuccess(response -> cachedResponse = response);
return readFromCache
.switchIfEmpty(readFromStorage)
.filter(this::isResponseValid)
.doOnError(s -> cachedResponse = null);
}
/**
* Read the contents of the file into a proto object using the parser. Since writes are not
* atomic, the caller will receive {@link Maybe#empty()} when data is corrupt.
*
* <p>Some valid scenarios that can lead to corrupt data :
*
* <ul>
* <li>Out of disk space while writing
* <li>Power outage while writing
* <li>Process killed while writing
* </ul>
*
* @param parser
* @param <T>
*/
public <T extends AbstractMessageLite> Maybe<T> read(Parser<T> parser) {
return Maybe.fromCallable(
() -> {
// reads / writes are synchronized per client instance
synchronized (this) {
try (FileInputStream inputStream = application.openFileInput(fileName)) {
return parser.parseFrom(inputStream);
} catch (InvalidProtocolBufferException | FileNotFoundException e) {
Logging.logi("Recoverable exception while reading cache: " + e.getMessage());
return null;
}
}
});
}