下面列出了io.reactivex.Maybe#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Maybe<T> apply(Uni<T> uni) {
return Maybe.create(emitter -> {
CompletableFuture<T> future = uni.subscribe().asCompletionStage();
emitter.setCancellable(() -> future.cancel(false));
future.whenComplete((res, fail) -> {
if (future.isCancelled()) {
return;
}
if (fail != null) {
emitter.onError(fail);
} else if (res != null) {
emitter.onSuccess(res);
emitter.onComplete();
} else {
emitter.onComplete();
}
});
});
}
@Override
public Maybe<String> getSchema(String identityProviderId) {
LOGGER.debug("Find identity provider plugin schema by ID: {}", identityProviderId);
return Maybe.create(emitter -> {
try {
String schema = identityProviderPluginManager.getSchema(identityProviderId);
if (schema != null) {
emitter.onSuccess(schema);
} else {
emitter.onComplete();
}
} catch (Exception e) {
LOGGER.error("An error occurs while trying to get schema for identity provider plugin {}", identityProviderId, e);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get schema for identity provider plugin " + identityProviderId, e));
}
});
}
@SuppressWarnings("CheckResult")
@Test
public void shouldExecuteAsBlockingAfterSubscription() {
//noinspection unchecked
final PreparedMaybeOperation<String, String, String> preparedOperation = mock(PreparedMaybeOperation.class);
String expectedResult = "test";
when(preparedOperation.executeAsBlocking()).thenReturn(expectedResult);
TestObserver<String> testObserver = new TestObserver<String>();
verifyZeroInteractions(preparedOperation);
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<String, String, String>(preparedOperation));
verifyZeroInteractions(preparedOperation);
maybe.subscribe(testObserver);
testObserver.assertValue(expectedResult);
testObserver.assertNoErrors();
testObserver.assertComplete();
verify(preparedOperation).executeAsBlocking();
}
@Override
public Maybe<String> getSchema(String factorId) {
LOGGER.debug("Find authenticator plugin schema by ID: {}", factorId);
return Maybe.create(emitter -> {
try {
String schema = factorPluginManager.getSchema(factorId);
if (schema != null) {
emitter.onSuccess(schema);
} else {
emitter.onComplete();
}
} catch (Exception e) {
LOGGER.error("An error occurs while trying to get schema for factor plugin {}", factorId, e);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get schema for factor plugin " + factorId, e));
}
});
}
@Override
public Maybe<ReporterPlugin> findById(String reporterId) {
LOGGER.debug("Find reporter plugin by ID: {}", reporterId);
return Maybe.create(emitter -> {
try {
Plugin reporter = reporterPluginManager.findById(reporterId);
if (reporter != null) {
emitter.onSuccess(convert(reporter));
} else {
emitter.onComplete();
}
} catch (Exception ex) {
LOGGER.error("An error occurs while trying to get reporter plugin : {}", reporterId, ex);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get reporter plugin : " + reporterId, ex));
}
});
}
/**
* Given an operation that takes a {@link MaybeConsumer<T>}, create a JavaRX {@link Maybe<T>} that
* produces the value passed to the MaybeConsumer, or onComplete if the value is null
*
* <p>Example:
*
* <pre>
* // log the name of the experiment with a given id
* DataController dc = getDataController();
* MaybeConsumers.MaybeConsumers.buildMaybe(mc -> dc.getLastUsedUnarchivedExperiment(mc))
* .subscribe(experiment -> log("Name: " + experiment.getName()));
* </pre>
*/
public static <T> Maybe<T> buildMaybe(io.reactivex.functions.Consumer<MaybeConsumer<T>> c) {
return Maybe.create(
emitter ->
c.accept(
new MaybeConsumer<T>() {
@Override
public void success(T value) {
if (value == null) {
emitter.onComplete();
} else {
emitter.onSuccess(value);
}
}
@Override
public void fail(Exception e) {
emitter.onError(e);
}
}));
}
@Override
// TODO : refactor (after JWKS information)
public Maybe<CertificateProvider> getCertificateProvider(String certificateId) {
return Maybe.create(emitter -> {
try {
CertificateProvider certificateProvider = this.certificateProviders.get(certificateId);
if (certificateProvider != null) {
emitter.onSuccess(certificateProvider);
} else {
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
});
}
@Override
public Maybe<String> getSchema(String policyId) {
LOGGER.debug("Find policy plugin schema by ID: {}", policyId);
return Maybe.create(emitter -> {
try {
String schema = policyPluginManager.getSchema(policyId);
if (schema != null) {
emitter.onSuccess(schema);
} else {
emitter.onComplete();
}
} catch (Exception e) {
LOGGER.error("An error occurs while trying to get schema for policy plugin {}", policyId, e);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get schema for policy plugin " + policyId, e));
}
});
}
@Override
public Maybe<ExtensionGrantPlugin> findById(String extensionGrantPluginId) {
LOGGER.debug("Find extension grant plugin by ID: {}", extensionGrantPluginId);
return Maybe.create(emitter -> {
try {
Plugin extensionGrant = extensionGrantPluginManager.findById(extensionGrantPluginId);
if (extensionGrant != null) {
emitter.onSuccess(convert(extensionGrant));
} else {
emitter.onComplete();
}
} catch (Exception ex) {
LOGGER.error("An error occurs while trying to get extension grant plugin : {}", extensionGrantPluginId, ex);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get extension grant plugin : " + extensionGrantPluginId, ex));
}
});
}
/**
* Creates {@link Maybe} that when subscribed to executes the query against a database
* and emits query result to downstream.
* <p>
* The resulting stream will be empty if query result is {@code null}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code run} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return Deferred {@link Maybe} that when subscribed to executes the query and emits
* its result to downstream
* @see #runBlocking
*/
@NonNull
@CheckResult
public final Maybe<T> run() {
return Maybe.create(new MaybeOnSubscribe<T>() {
@Override
public void subscribe(MaybeEmitter<T> emitter) {
final Cursor cursor = rawQuery(true);
if (emitter.isDisposed()) {
if (cursor != null) {
cursor.close();
}
return;
}
final T result = map(cursor);
if (result != null) {
emitter.onSuccess(result);
} else {
emitter.onComplete();
}
}
});
}
@Override
public Maybe<IdentityProviderPlugin> findById(String identityProviderId) {
LOGGER.debug("Find identity provider plugin by ID: {}", identityProviderId);
return Maybe.create(emitter -> {
try {
Plugin identityProvider = identityProviderPluginManager.findById(identityProviderId);
if (identityProvider != null) {
emitter.onSuccess(convert(identityProvider));
} else {
emitter.onComplete();
}
} catch (Exception ex) {
LOGGER.error("An error occurs while trying to get identity provider plugin : {}", identityProviderId, ex);
emitter.onError(new TechnicalManagementException("An error occurs while trying to get identity provider plugin : " + identityProviderId, ex));
}
});
}
@Override
public Maybe<T> apply(Multi<T> multi) {
return Maybe.create(emitter -> {
multi.subscribe().with(
item -> {
emitter.onSuccess(item);
emitter.onComplete();
},
emitter::onError,
emitter::onComplete);
});
}
public Maybe<SearchResponse> search(SearchRequest searchRequest) {
return Maybe.create(sink ->
restHighLevelClient.searchAsync(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
sink.onSuccess(searchResponse);
}
@Override
public void onFailure(Exception e) {
sink.onError(e);
}
}));
}
private static <T> Maybe<T> taskToMaybe(Task<T> task) {
return Maybe.create(
emitter -> {
task.addOnSuccessListener(
result -> {
emitter.onSuccess(result);
emitter.onComplete();
});
task.addOnFailureListener(
e -> {
emitter.onError(e);
emitter.onComplete();
});
});
}
public static Maybe<DirectorySettings> create(Location targetLocation)
{
return Maybe.create(s -> {
Path p = targetLocation.getCurrentPath();
if(p.isFile())
p = p.getParentPath();
DirectorySettings ds = getDirectorySettings(p);
if(ds == null)
s.onComplete();
else
s.onSuccess(ds);
});
}
private static Maybe<String> maybe(String input) {
RequestContext.current();
return Maybe.create(emitter -> {
RequestContext.current();
emitter.onSuccess(input);
});
}
@CheckResult
@NonNull
public static <Result, WrappedResult, Data> Maybe<Result> createMaybe(
@NonNull StorIOSQLite storIOSQLite,
@NonNull PreparedMaybeOperation<Result, WrappedResult, Data> operation
) {
throwExceptionIfRxJava2IsNotAvailable("asRxMaybe()");
final Maybe<Result> maybe =
Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<Result, WrappedResult, Data>(operation));
return subscribeOn(storIOSQLite, maybe);
}
@CheckResult
@NonNull
public static <Result, WrappedResult, Data> Maybe<Result> createMaybe(
@NonNull StorIOContentResolver storIOContentResolver,
@NonNull PreparedMaybeOperation<Result, WrappedResult, Data> operation
) {
throwExceptionIfRxJava2IsNotAvailable("asRxMaybe()");
final Maybe<Result> maybe =
Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<Result, WrappedResult, Data>(operation));
return subscribeOn(storIOContentResolver, maybe);
}
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
//noinspection unchecked
final PreparedMaybeOperation<Object, Object, Object> preparedOperation = mock(PreparedMaybeOperation.class);
StorIOException expectedException = new StorIOException("test exception");
when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);
TestObserver<Object> testObserver = new TestObserver<Object>();
Maybe<Object> maybe = Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<Object, Object, Object>(preparedOperation));
verifyZeroInteractions(preparedOperation);
maybe.subscribe(testObserver);
testObserver.assertError(expectedException);
testObserver.assertNotComplete();
verify(preparedOperation).executeAsBlocking();
}
@RequiresPermission(anyOf = {Manifest.permission.ACCESS_COARSE_LOCATION, Manifest.permission.ACCESS_FINE_LOCATION})
public Maybe<Location> lastLocation() {
return Maybe.create(new LocationLastMaybeOnSubscribe(rxLocation));
}