下面列出了怎么用io.reactivex.MaybeEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* use only request with an empty array to request all manifest permissions
*/
public Maybe<PermissionResult> requestAsMaybe(final List<String> permissions) {
return Maybe.create(new MaybeOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final MaybeEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onSuccess(result);
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
});
}
@Override
public final void subscribe(MaybeEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(() -> {
if (apiClient.isConnected()) {
onUnsubscribed(apiClient);
}
apiClient.disconnect();
});
}
/**
* Creates {@link Maybe} that when subscribed to executes the query against a database
* and emits query result to downstream.
* <p>
* The resulting stream will be empty if query result is {@code null}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code run} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return Deferred {@link Maybe} that when subscribed to executes the query and emits
* its result to downstream
* @see #runBlocking
*/
@NonNull
@CheckResult
public final Maybe<T> run() {
return Maybe.create(new MaybeOnSubscribe<T>() {
@Override
public void subscribe(MaybeEmitter<T> emitter) {
final Cursor cursor = rawQuery(true);
if (emitter.isDisposed()) {
if (cursor != null) {
cursor.close();
}
return;
}
final T result = map(cursor);
if (result != null) {
emitter.onSuccess(result);
} else {
emitter.onComplete();
}
}
});
}
@Override @NonNull public Maybe<T> get() {
return Maybe.create(new MaybeOnSubscribe<T>() {
@Override public void subscribe(final MaybeEmitter<T> emitter) throws Exception {
runInReadLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists()) {
emitter.onComplete();
return;
}
T value = converter.read(file, type);
if (value == null) emitter.onComplete();
emitter.onSuccess(value);
}
});
}
});
}
private static <T> void onSuccess(@Nullable T v, MaybeEmitter<T> emitter) {
if (v == null) {
emitter.onComplete();
} else {
emitter.onSuccess(v);
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void manageOptional(MaybeEmitter<T> emitter, Optional<T> optional) {
if (optional.isPresent()) {
emitter.onSuccess(optional.get());
} else {
emitter.onComplete();
}
}
@Override
public Maybe<Response> download(Request request) {
return Maybe.create(new MaybeOnSubscribe<String>(){
@Override
public void subscribe(MaybeEmitter emitter) throws Exception {
if (webDriver!=null) {
webDriver.get(request.getUrl());
if (Preconditions.isNotBlank(actions)) {
actions.forEach(
action-> action.perform(webDriver)
);
}
emitter.onSuccess(webDriver.getPageSource());
}
}
})
.compose(new DownloaderDelayTransformer(request))
.map(new Function<String, Response>() {
@Override
public Response apply(String html) throws Exception {
Response response = new Response();
response.setContent(html.getBytes());
response.setStatusCode(Constant.OK_STATUS_CODE);
response.setContentType(getContentType(webDriver));
return response;
}
});
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, MaybeEmitter<Location> emitter) {
//noinspection MissingPermission
Location location = LocationServices.FusedLocationApi.getLastLocation(apiClient);
if (location != null) {
emitter.onSuccess(location);
} else {
emitter.onComplete();
}
}
@Override
public void subscribe(MaybeEmitter<FirebaseUser> emitter) throws Exception {
if (!emitter.isDisposed()) {
FirebaseUser user = instance.getCurrentUser();
if (null != user) {
emitter.onSuccess(user);
}
emitter.onComplete();
}
}
/**
* <p>
* Generate RX observable support
* </p>
* .
*
* @param dataSourceName
* the data source name
* @param daoFactory
* the dao factory
*/
public void generateRx(ClassName dataSourceName, String daoFactory) {
classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalSubscribeOn", Modifier.PROTECTED).build());
classBuilder.addMethod(MethodSpec.methodBuilder("globalSubscribeOn").returns(dataSourceName)
.addParameter(Scheduler.class, "scheduler").addModifiers(Modifier.PUBLIC)
.addStatement("this.globalSubscribeOn=scheduler").addStatement("return this").build());
classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalObserveOn", Modifier.PROTECTED).build());
classBuilder.addMethod(MethodSpec.methodBuilder("globalObserveOn").addParameter(Scheduler.class, "scheduler")
.returns(dataSourceName).addModifiers(Modifier.PUBLIC).addStatement("this.globalObserveOn=scheduler")
.addStatement("return this").build());
generateRxInterface(daoFactory, RxInterfaceType.BATCH, ObservableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, ObservableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, SingleEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, SingleEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, FlowableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, FlowableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, MaybeEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, MaybeEmitter.class);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.SINGLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.FLOWABLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.MAYBE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.SINGLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.FLOWABLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.MAYBE);
}
@Override
public void subscribe(@NonNull MaybeEmitter<Result> emitter) throws Exception {
try {
final Result value = preparedOperation.executeAsBlocking();
if (value != null) {
emitter.onSuccess(value);
} else {
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
}
private ApiClientConnectionCallbacks(MaybeEmitter<T> emitter) {
this.emitter = emitter;
}
TransactionResult onExecute(BindKripton180RawInsertSelectDaoFactory daoFactory,
MaybeEmitter<T> emitter);
TransactionResult onExecute(BindKripton180BeanInsertSelectDaoFactory daoFactory,
MaybeEmitter<T> emitter);
protected abstract void onGoogleApiClientReady(GoogleApiClient apiClient, MaybeEmitter<T> emitter);
void onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindUserDaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindUserDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindKripton180RawInsertSelectDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindKripton180BeanInsertSelectDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindApp0DaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindApp0DaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindAppDaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindApp0DaoFactory daoFactory, MaybeEmitter<T> emitter);
TransactionResult onExecute(BindApp0DaoFactory daoFactory, MaybeEmitter<T> emitter);
void onExecute(BindXenoDaoFactory daoFactory, MaybeEmitter<T> emitter);