下面列出了怎么用io.reactivex.Completable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Completable deployToResteasy(VertxResteasyDeployment deployment) {
return Completable.defer(() -> {
JsonArray packages = AppGlobals.get().getConfig().getJsonArray("scan");
if(packages == null) {
System.err.println("Not scanning any packages, please specify the 'scan' array of packages in configuration");
}else {
String[] packagesToScan = (String[]) packages.getList().toArray(new String[packages.size()]);
new FastClasspathScanner(packagesToScan)
.matchClassesWithAnnotation(Path.class, klass -> {
if(!Modifier.isAbstract(klass.getModifiers()))
deployment.getActualResourceClasses().add(klass);
})
.matchClassesWithAnnotation(Provider.class, klass -> {
if(!Modifier.isAbstract(klass.getModifiers()))
deployment.getActualProviderClasses().add(klass);
})
.scan();
}
return super.deployToResteasy(deployment);
});
}
@Test
public void shouldDelete() {
final String emailId = "email-1";
final String domainId = "domain-1";
final Domain mockDomain = new Domain();
mockDomain.setId(domainId);
doReturn(Maybe.just(mockDomain)).when(domainService).findById(domainId);
doReturn(Completable.complete()).when(emailTemplateService).delete(eq(emailId), any());
doReturn(Completable.complete()).when(emailManager).deleteEmail(any());
final Response response = target("domains")
.path(domainId)
.path("emails")
.path(emailId)
.request().delete();
assertEquals(HttpStatusCode.NO_CONTENT_204, response.getStatus());
}
public static Completable dumbTradingLogic(String company, int numberOfShares, io.vertx.workshop.portfolio.reactivex.PortfolioService portfolio, JsonObject quote) {
if (quote.getString("name").equals(company)) {
if (TraderUtils.timeToSell()) {
System.out.println("Trying to sell " + numberOfShares + " " + company);
return portfolio.rxSell(numberOfShares, quote)
.doOnSuccess(p -> System.out.println("Sold " + numberOfShares + " of " + company + "!"))
.doOnError(e -> System.out.println("D'oh, failed to sell " + numberOfShares + " of "
+ company + ": " + e.getMessage()))
.toCompletable();
} else {
System.out.println("Trying to buy " + numberOfShares + " " + company);
return portfolio.rxBuy(numberOfShares, quote)
.doOnSuccess(p -> System.out.println("Bought " + numberOfShares + " of " + company + " !"))
.doOnError(e -> System.out.println("D'oh, failed to buy " + numberOfShares + " of " + company + " : " + e
.getMessage()))
.toCompletable();
}
}
return Completable.complete();
}
@Test
public void shouldRevokeUserConsents() {
final String domainId = "domain-1";
final Domain mockDomain = new Domain();
mockDomain.setId(domainId);
final User mockUser = new User();
mockUser.setId("user-id-1");
doReturn(Maybe.just(mockDomain)).when(domainService).findById(domainId);
doReturn(Completable.complete()).when(scopeApprovalService).revokeByUser(eq(domainId), eq(mockUser.getId()), any());
final Response response = target("domains")
.path(domainId)
.path("users")
.path(mockUser.getId())
.path("consents")
.request()
.delete();
assertEquals(HttpStatusCode.NO_CONTENT_204, response.getStatus());
}
private void doOpenSession() {
this.mediaSession = mediaSessionFactory.newMediaSession();
Completable
.fromAction(() -> reportMediaAndState(mediaSession))
.subscribeOn(schedulersProvider.io())
.subscribe();
}
@Override
public Completable rxStart() {
String address = config().getString("id");
return vertx.eventBus().consumer(address)
.handler(message -> message.reply("OK-" + address))
.rxCompletionHandler();
}
@Override
protected Exception getFailure(Completable instance) {
AtomicReference<Exception> reference = new AtomicReference<>();
try {
instance.blockingAwait();
} catch (Exception e) {
reference.set(e);
}
return reference.get();
}
@Test
public void shouldComplete() {
given(bulkhead.tryAcquirePermission()).willReturn(true);
Completable.complete()
.compose(BulkheadOperator.of(bulkhead))
.test()
.assertSubscribed()
.assertComplete();
then(bulkhead).should().onComplete();
}
private Completable saveBitmapAndUpdateResponse(Bitmap bitmap, Field field) {
String localFileName = uuidGenerator.generateUuid() + Config.PHOTO_EXT;
String remoteDestinationPath =
getRemoteDestinationPath(
originalObservation.getProject().getId(),
originalObservation.getForm().getId(),
originalObservation.getFeature().getId(),
localFileName);
photoUpdates.postValue(ImmutableMap.of(field, remoteDestinationPath));
return storageManager.savePhoto(bitmap, localFileName);
}
private Completable releaseAsync(ClientPool pool, Client client) {
return Completable.create(
emitter -> {
log.debug("releasing client");
pool.release(client);
emitter.onComplete();
})
.subscribeOn(Schedulers.io());
}
@Override
public Completable deleteAccount(String address, String password) {
return Single.fromCallable(() -> findAccount(address))
.flatMapCompletable(account -> Completable.fromAction(
() -> keyStore.deleteAccount(account, password)))
.subscribeOn(Schedulers.io());
}
@SuppressLint("MissingPermission")
public Completable requestLocationUpdates(
LocationRequest locationRequest, RxLocationCallback locationCallback) {
return RxTask.toCompletable(
() ->
fusedLocationProviderClient.requestLocationUpdates(
locationRequest, locationCallback, Looper.myLooper()));
}
@Test
public void shouldPropagateAndMarkError() {
given(circuitBreaker.tryAcquirePermission()).willReturn(true);
Completable.error(new IOException("BAM!"))
.compose(CircuitBreakerOperator.of(circuitBreaker))
.test()
.assertSubscribed()
.assertError(IOException.class)
.assertNotComplete();
then(circuitBreaker).should()
.onError(anyLong(), any(TimeUnit.class), any(IOException.class));
then(circuitBreaker).should(never()).onSuccess(anyLong(), any(TimeUnit.class));
}
@Test public void completable_assembleInScope_subscribeInScope() {
Completable source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Completable.complete()
.doOnComplete(this::assertInAssemblyContext);
errorSource = Completable.error(new IllegalStateException())
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
}
subscribeInDifferentContext(source.toObservable(), errorSource.toObservable()).assertResult();
}
/**
* @param config
* @return
*/
@CheckReturnValue
@NonNull
public static Completable fetches(@NonNull final FirebaseRemoteConfig config) {
return RxTask.completes(config.fetch()).doOnComplete(new Action() {
@Override
public void run() throws Exception {
config.activateFetched();
}
});
}
@Override
public <E extends T> Completable delete(final E entity) {
return Completable.fromCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
delegate.delete(entity);
return null;
}
});
}
private Completable sendMessage(DefaultMessage message, String channelId, Long messagesCount) {
return Completable.create(emitter -> {
firestore.collection(COLLECTION_CHANNELS)
.document(channelId)
.collection(COLLECTION_MESSAGES)
.document(messagesCount + "")
.set(message)
.addOnCompleteListener(task -> {
if (task.isSuccessful()) {
emitter.onComplete();
} else {
emitter.onError(task.getException());
}
});
}).andThen(Completable.create(emitter -> {
firestore.collection(COLLECTION_CHANNELS)
.document(channelId)
.update(KEY_PROPERTY_COUNT, messagesCount + 1)
.addOnCompleteListener(task -> {
if (task.isSuccessful())
emitter.onComplete();
else
emitter.onError(task.getException());
});
})).observeOn(Schedulers.io())
.andThen(Completable.create(insertMessageEmitter -> {
message.setChannelId(channelId);
roomDb.messageDao().inertMessage(message);
insertMessageEmitter.onComplete();
})).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
/**
* Resolve an asynchronous status and send back the response.
* By default, the successful status code is 200 OK.
*
* @param context routing context
* @param asyncResult asynchronous status with no result
*/
protected void sendResponse(RoutingContext context, Completable asyncResult) {
HttpServerResponse response = context.response();
if (asyncResult == null) {
internalError(context, "invalid_status");
} else {
asyncResult.subscribe(response::end, ex -> internalError(context, ex));
}
}
/**
* @param user
* @param request
* @return
*/
@CheckReturnValue
@NonNull
public static Completable updateProfile(
@NonNull final FirebaseUser user, @NonNull final UserProfileChangeRequest request) {
return RxTask.completes(new Callable<Task<Void>>() {
@Override
public Task<Void> call() throws Exception {
return user.updateProfile(request);
}
});
}
public Completable send() {
if(to == null && cc == null && bcc == null)
throw new IllegalStateException("Missing to, cc or bcc");
if(subject == null)
throw new IllegalStateException("Missing subject");
Mailer mailer = AppGlobals.get().getMailer();
return mailer.send(this);
}
public Completable add(String address, String symbol, int decimals) {
return walletRepository
.getDefaultWallet()
.flatMapCompletable(wallet -> tokenRepository
.addToken(wallet, address, symbol, decimals)
.observeOn(AndroidSchedulers.mainThread()));
}
@Test
public void whenCreateIsCalledAndAnErrorOccursThenObserverOnErrorIsCalled()
throws Throwable {
this.expectedException.expectCause(isA(MqttException.class));
final IMqttAsyncClient client = Mockito.mock(IMqttAsyncClient.class);
Mockito.when(client.disconnect(Matchers.isNull(),
Matchers.any(DisconnectFactory.DisconnectActionListener.class)))
.thenThrow(new MqttException(
MqttException.REASON_CODE_CLIENT_CONNECTED));
final DisconnectFactory factory = new DisconnectFactory(client);
final Completable obs = factory.create();
obs.blockingAwait();
}
@CheckResult
@NonNull
public static Completable subscribeOn(
@NonNull StorIOSQLite storIOSQLite,
@NonNull Completable completable
) {
final Scheduler scheduler = storIOSQLite.defaultRxScheduler();
return scheduler != null ? completable.subscribeOn(scheduler) : completable;
}
public Completable start(JsonObject defaultConfig, Class<?>... resourceOrProviderClasses){
/*
* OK this sucks: since the rx hooks are static, we can start a second server and the hooks are still there,
* which means that the new server's Single flow will capture the existing current AppGlobals, so we reset it
* here, even though it's not correct because it should be in susbscribe/create but that'd be too late and that
* flow would already be polluted with our globals…
*/
AppGlobals.clear();
appGlobals = AppGlobals.init();
return Single.<JsonObject>create(s -> {
setupLogging();
// Propagate the Resteasy/Redpipe/CDI contexts
Context.load();
JsonObject config = loadFileConfig(defaultConfig);
AppGlobals.get().setConfig(config);
s.onSuccess(config);
})
.flatMap(this::initVertx)
.flatMapCompletable(vertx -> {
this.vertx = vertx;
AppGlobals.get().setVertx(this.vertx);
return setupPlugins();
})
.concatWith(setupTemplateRenderers())
.andThen(setupResteasy(resourceOrProviderClasses))
.flatMapCompletable(deployment -> {
setupSwagger(deployment);
return setupVertx(deployment);
});
}
@Test
public void whenCreateIsCalledAndAnErrorOccursThenObserverOnErrorIsCalled()
throws Throwable {
this.expectedException.expectCause(isA(MqttException.class));
final MqttConnectOptions options = Mockito
.mock(MqttConnectOptions.class);
final IMqttAsyncClient client = Mockito.mock(IMqttAsyncClient.class);
Mockito.when(client.connect(Matchers.same(options), Matchers.isNull(),
Matchers.any(ConnectFactory.ConnectActionListener.class)))
.thenThrow(new MqttException(
MqttException.REASON_CODE_CLIENT_CONNECTED));
final ConnectFactory factory = new ConnectFactory(client, options);
final Completable obs = factory.create();
obs.blockingAwait();
}
/**
* Check the user account status
* @param user Authenticated user
* @return Completable.complete() or Completable.error(error) if account status is not ok
*/
private Completable checkAccountStatus(User user) {
if (!user.isEnabled()) {
return Completable.error(new AccountDisabledException("Account is disabled for user " + user.getUsername()));
}
return Completable.complete();
}
@Override
public Completable intercept(ContainerRequestContext requestContext) {
if (requestContext.getHeaders().containsKey("throw")) {
throw new NotAuthorizedException("Surprise!");
}
return Completable.complete();
}
@Override
public Completable send(Mail email) {
Single<Optional<Buffer>> htmlRender = email.renderHtml().map(buffer -> Optional.of(buffer)).toSingle(Optional.empty());
Single<Buffer> textRender = email.renderText();
return Single.zip(textRender, htmlRender, (text, html) -> {
send(email, text, html.orElse(null));
return Completable.complete();
}).flatMapCompletable(c -> c);
}
@Override
public Completable fire(List<Rule> rules, ExecutionContext executionContext) {
if (rules.isEmpty()) {
LOGGER.debug("No rules registered!");
return Completable.complete();
}
return Completable.create(emitter -> {
policyChainProcessorFactory
.create(resolve(rules), executionContext)
.handler(executionContext1 -> emitter.onComplete())
.errorHandler(processorFailure -> emitter.onError(new PolicyChainException(processorFailure.message(), processorFailure.statusCode(), processorFailure.key(), processorFailure.parameters(), processorFailure.contentType())))
.handle(executionContext);
});
}
@Override
public void start(Promise<Void> startFuture) throws Exception {
Completable completable = rxStart();
if (completable != null) {
completable.subscribe(startFuture::complete, startFuture::fail);
} else {
super.start(startFuture);
}
}