下面列出了怎么用io.reactivex.Single的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void create_errorMultipleResource_missingScope() {
//Prepare request
List<PermissionRequest> request = Arrays.asList(
new PermissionRequest().setResourceId("one").setResourceScopes(Arrays.asList("a","b")),
new PermissionRequest().setResourceId("two").setResourceScopes(Arrays.asList("c","d"))
);
// Prepare Resource
List<Resource> found = Arrays.asList(
new Resource().setId("one").setResourceScopes(Arrays.asList("a","b")),
new Resource().setId("two").setResourceScopes(Arrays.asList("not","same"))
);
when(resourceService.findByDomainAndClientAndResources(DOMAIN_ID, CLIENT_ID,Arrays.asList("one","two"))).thenReturn(Single.just(found));
TestObserver<PermissionTicket> testObserver = service.create(request, DOMAIN_ID, CLIENT_ID).test();
testObserver.assertNotComplete();
testObserver.assertError(err -> ((InvalidPermissionRequestException)err).getOAuth2ErrorCode().equals("invalid_scope"));
verify(repository, times(0)).create(any());
}
@Override
public Single<ListResponse<User>> list(int page, int size, String baseUrl) {
LOGGER.debug("Find users by domain: {}", domain.getId());
return userRepository.findByDomain(domain.getId(), page, size)
.flatMap(userPage -> {
// A negative value SHALL be interpreted as "0".
// A value of "0" indicates that no resource results are to be returned except for "totalResults".
if (size <= 0) {
return Single.just(new ListResponse<User>(null, userPage.getCurrentPage() + 1, userPage.getTotalCount(), 0));
} else {
// SCIM use 1-based index (increment current page)
return Observable.fromIterable(userPage.getData())
.map(user1 -> convert(user1, baseUrl, true))
// set groups
.flatMapSingle(user1 -> setGroups(user1))
.toList()
.map(users -> new ListResponse<>(users, userPage.getCurrentPage() + 1, userPage.getTotalCount(), users.size()));
}
})
.onErrorResumeNext(ex -> {
LOGGER.error("An error occurs while trying to find users by domain {}", domain, ex);
return Single.error(new TechnicalManagementException(String.format("An error occurs while trying to find users by domain %s", domain), ex));
});
}
private void single() {
Single.just(1).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d.isDisposed());
}
@Override
public void onSuccess(Integer integer) {
Log.d(TAG, "onSuccess: " + integer);
}
@Override
public void onError(Throwable e) {
}
});
}
@Test
public void shouldInvokeTokenEndpoint_withValidClientCredentials_noAccessToken() throws Exception {
Client client = new Client();
client.setClientId("my-client");
client.setScopes(Collections.singletonList("read"));
router.route().order(-1).handler(new Handler<RoutingContext>() {
@Override
public void handle(RoutingContext routingContext) {
routingContext.put("client", client);
routingContext.next();
}
});
when(tokenGranter.grant(any(TokenRequest.class), any(io.gravitee.am.model.oidc.Client.class))).thenReturn(Single.error(new Exception()));
testRequest(
HttpMethod.POST, "/oauth/token?client_id=my-client&client_secret=my-secret&grant_type=client_credentials",
HttpStatusCode.INTERNAL_SERVER_ERROR_500, "Internal Server Error");
}
/**
* Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> Flowable<TResponse> oneToMany(
final Single<TRequest> rxRequest,
final BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
final CallOptions options) {
try {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
return rxRequest
.flatMapPublisher(new io.reactivex.functions.Function<TRequest, Publisher<? extends TResponse>>() {
@Override
public Publisher<? extends TResponse> apply(TRequest request) {
final RxClientStreamObserverAndPublisher<TResponse> consumerStreamObserver =
new RxClientStreamObserverAndPublisher<TResponse>(null, null, prefetch, lowTide);
delegate.accept(request, consumerStreamObserver);
return consumerStreamObserver;
}
});
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
}
@Path("coroutines/1")
@GET
public Single<Response> helloAsync(@Context io.vertx.reactivex.core.Vertx rxVertx){
return Fibers.fiber(() -> {
System.err.println("Creating client");
WebClientOptions options = new WebClientOptions();
options.setSsl(true);
options.setTrustAll(true);
options.setVerifyHost(false);
WebClient client = WebClient.create(rxVertx, options);
Single<HttpResponse<io.vertx.reactivex.core.buffer.Buffer>> responseHandler = client.get(443,
"www.google.com",
"/robots.txt").rxSend();
System.err.println("Got response");
HttpResponse<io.vertx.reactivex.core.buffer.Buffer> httpResponse = Fibers.await(responseHandler);
System.err.println("Got body");
client.close();
return Response.ok(httpResponse.body().toString()).build();
});
}
private void storeInDatabase(JsonObject operation) {
// 1. need to retrieve a connection
// 2. execute the insertion statement
// 3. close the connection
// Step 1 get the connection
Single<SQLConnection> connectionRetrieved = jdbc.rxGetConnection();
// Step 2, when the connection is retrieved (this may have failed), do the insertion (upon success)
Single<UpdateResult> update = connectionRetrieved
.flatMap(connection ->
connection.rxUpdateWithParams(INSERT_STATEMENT, new JsonArray().add(operation.encode()))
// Step 3, when the insertion is done, close the connection.
.doAfterTerminate(connection::close));
update.subscribe(result -> {
// Ok
}, err -> {
System.err.println("Failed to insert operation in database: " + err);
});
}
@Test
public void shouldUpdate() {
UpdateDomain updateDomain = Mockito.mock(UpdateDomain.class);
when(domainRepository.findById("my-domain")).thenReturn(Maybe.just(new Domain()));
when(domainRepository.update(any(Domain.class))).thenReturn(Single.just(new Domain()));
when(eventService.create(any())).thenReturn(Single.just(new Event()));
TestObserver testObserver = domainService.update("my-domain", updateDomain).test();
testObserver.awaitTerminalEvent();
testObserver.assertComplete();
testObserver.assertNoErrors();
verify(domainRepository, times(1)).findById(anyString());
verify(domainRepository, times(1)).update(any(Domain.class));
verify(eventService, times(1)).create(any());
}
@Override
protected void callApi(int page, OnCallApiDone<Repo> onCallApiDone) {
Single<Pageable<Repo>> single;
if (staredCount != null && (staredCount.getValue() == null || staredCount.getValue() < 0)) {
single = Single.zip(userRestService.getStarred(targetUser, page),
userRestService.getStarredCount(targetUser), (repoPageable, count) -> {
if (count != null){
staredCount.setValue(count.getLast());
}
return repoPageable;
});
} else {
single = userRestService.getStarred(targetUser, page);
}
execute(true, single, repoPageable -> {
onCallApiDone.onDone(repoPageable.getLast(), page == 1, repoPageable.getItems());
});
}
@Test
public void shouldDelete() {
User user = new User();
user.setReferenceType(ReferenceType.DOMAIN);
user.setReferenceId(DOMAIN);
when(userRepository.findById("my-user")).thenReturn(Maybe.just(user));
when(userRepository.delete("my-user")).thenReturn(Completable.complete());
when(eventService.create(any())).thenReturn(Single.just(new Event()));
TestObserver testObserver = userService.delete("my-user").test();
testObserver.awaitTerminalEvent();
testObserver.assertComplete();
testObserver.assertNoErrors();
verify(userRepository, times(1)).delete("my-user");
verify(eventService, times(1)).create(any());
}
@Override
@SuppressWarnings("unchecked")
public Single<T> insertReturning(Function<DSLContext, ? extends InsertResultStep<R>> queryFunction, Function<Object, T> keyMapper) {
Query query = createQuery(queryFunction);
log(query);
String sql = query.getSQL();
JsonArray bindValues = getBindValues(query);
Function<SQLConnection, Single<? extends T>> runInsertReturning;
if(isMysql){
runInsertReturning = sqlConnection -> sqlConnection
.rxUpdateWithParams(sql, bindValues)
.map(updateResult -> keyMapper.apply(updateResult.getKeys()));
}else{
runInsertReturning = sqlConnection ->
sqlConnection
.rxQueryWithParams(sql, bindValues)
.map(queryResult -> keyMapper.apply(queryResult.getResults().get(0)));
}
return getConnection().flatMap(executeAndClose(runInsertReturning));
}
@Override
public Single<String> createTransaction(Wallet from, String toAddress, BigInteger subunitAmount, BigInteger gasPrice, BigInteger gasLimit, byte[] data, String password) {
final Web3j web3j = Web3jFactory.build(new HttpService(networkRepository.getDefaultNetwork().rpcServerUrl));
return Single.fromCallable(() -> {
EthGetTransactionCount ethGetTransactionCount = web3j
.ethGetTransactionCount(from.address, DefaultBlockParameterName.LATEST)
.send();
return ethGetTransactionCount.getTransactionCount();
})
.flatMap(nonce -> accountKeystoreService.signTransaction(from, password, toAddress, subunitAmount, gasPrice, gasLimit, nonce.longValue(), data, networkRepository.getDefaultNetwork().chainId))
.flatMap(signedMessage -> Single.fromCallable( () -> {
EthSendTransaction raw = web3j
.ethSendRawTransaction(Numeric.toHexString(signedMessage))
.send();
if (raw.hasError()) {
throw new ServiceException(raw.getError().getMessage());
}
return raw.getTransactionHash();
})).subscribeOn(Schedulers.io());
}
@Test
public void create_successMultipleResources() {
//Prepare request
List<PermissionRequest> request = Arrays.asList(
new PermissionRequest().setResourceId("one").setResourceScopes(Arrays.asList("a","b")),
new PermissionRequest().setResourceId("two").setResourceScopes(Arrays.asList("c","d"))
);
// Prepare Resource
List<Resource> found = request.stream()
.map(s -> new Resource().setId(s.getResourceId()).setResourceScopes(s.getResourceScopes()))
.collect(Collectors.toList());
when(resourceService.findByDomainAndClientAndResources(DOMAIN_ID, CLIENT_ID, Arrays.asList("one","two"))).thenReturn(Single.just(found));
when(repository.create(any())).thenReturn(Single.just(new PermissionTicket().setId("success")));
TestObserver<PermissionTicket> testObserver = service.create(request, DOMAIN_ID, CLIENT_ID).test();
testObserver.assertNoErrors().assertComplete().assertValue(permissionTicket -> "success".equals(permissionTicket.getId()));
verify(repository, times(1)).create(any());
}
@RequiresPermissions("update")
@PUT
@Path("pages/{id}")
public Single<Response> apiUpdatePage(@PathParam("id") String id,
@ApiUpdateValid("markdown") JsonObject page,
@Context HttpServerRequest req,
@Context Vertx vertx){
return Fibers.fiber(() -> {
Optional<Pages> res = Fibers.await(dao.findOneById(Integer.valueOf(id)));
if(!res.isPresent())
return Response.status(Status.NOT_FOUND).build();
Fibers.await(dao.update(res.get().setContent(page.getString("markdown"))));
JsonObject event = new JsonObject()
.put("id", id)
.put("client", page.getString("client"));
vertx.eventBus().publish("page.saved", event);
return Response.ok(new JsonObject().put("success", true)).build();
});
}
@Override
public Single<BigDecimal> balanceInWei(Wallet wallet)
{
return Single.fromCallable(() -> {
try
{
return new BigDecimal(getWeb3jService(networkRepository.getDefaultNetwork().chainId)
.ethGetBalance(wallet.address, DefaultBlockParameterName.PENDING)
.send()
.getBalance());
}
catch (IOException e)
{
return BigDecimal.valueOf(-1);
}
}).subscribeOn(Schedulers.io());
}
@Test
public void create_sectorIdentifierUriBadRequest() {
final String sectorUri = "https://sector/uri";
DynamicClientRegistrationRequest request = new DynamicClientRegistrationRequest();
request.setRedirectUris(Optional.empty());
request.setSectorIdentifierUri(Optional.of(sectorUri));//fail due to invalid url
HttpRequest<Buffer> httpRequest = Mockito.mock(HttpRequest.class);
HttpResponse httpResponse = Mockito.mock(HttpResponse.class);
when(webClient.getAbs(sectorUri)).thenReturn(httpRequest);
when(httpRequest.rxSend()).thenReturn(Single.just(httpResponse));
TestObserver<Client> testObserver = dcrService.create(request, BASE_PATH).test();
testObserver.assertError(InvalidClientMetadataException.class);
testObserver.assertNotComplete();
assertTrue("Should have only one exception", testObserver.errorCount()==1);
assertTrue("Unexpected start of error message", testObserver.errors().get(0).getMessage().startsWith("Unable to parse sector_identifier_uri"));
}
@Override
public Single<Set<TopApplication>> findTopApplications() {
LOGGER.debug("Find top applications");
return applicationRepository.findAll(0, Integer.MAX_VALUE)
.flatMapObservable(pagedApplications -> Observable.fromIterable(pagedApplications.getData()))
.flatMapSingle(application -> tokenService.findTotalTokensByApplication(application)
.map(totalToken -> {
TopApplication topApplication = new TopApplication();
topApplication.setApplication(application);
topApplication.setAccessTokens(totalToken.getTotalAccessTokens());
return topApplication;
})
)
.toList()
.map(topApplications -> topApplications.stream().filter(topClient -> topClient.getAccessTokens() > 0).collect(Collectors.toSet()))
.onErrorResumeNext(ex -> {
LOGGER.error("An error occurs while trying to find top applications", ex);
return Single.error(new TechnicalManagementException("An error occurs while trying to find top applications", ex));
});
}
private static <T1, T2, T3> Single<Tuple3<T1, T2, T3>> createWithThreeParameters(NamedCallableStatement stmt,
List<Object> parameters, List<ParameterPlaceholder> parameterPlaceholders, Class<T1> cls1, Class<T2> cls2,
Class<T3> cls3) {
return Single.fromCallable(() -> {
CallableStatement st = stmt.stmt;
List<PlaceAndType> outs = execute(stmt, parameters, parameterPlaceholders, 3, st);
T1 o1 = Util.mapObject(st, cls1, outs.get(0).pos, outs.get(0).type);
T2 o2 = Util.mapObject(st, cls2, outs.get(1).pos, outs.get(1).type);
T3 o3 = Util.mapObject(st, cls3, outs.get(2).pos, outs.get(2).type);
return Tuple3.create(o1, o2, o3);
});
}
@Test
public void update_scopeNotFound() {
NewResource newResource = new JsonObject("{\"resource_scopes\":[\"scope\"]}").mapTo(NewResource.class);
Resource exitingRS = new Resource().setId(RESOURCE_ID).setDomain(DOMAIN_ID);
when(scopeService.findByDomainAndKeys(DOMAIN_ID, Arrays.asList("scope"))).thenReturn(Single.just(Collections.emptyList()));
when(repository.findByDomainAndClientAndUserAndResource(DOMAIN_ID, CLIENT_ID, USER_ID, RESOURCE_ID)).thenReturn(Maybe.just(exitingRS));
TestObserver<Resource> testObserver = service.update(newResource, DOMAIN_ID, CLIENT_ID, USER_ID, RESOURCE_ID).test();
testObserver.assertError(ScopeNotFoundException.class);
verify(repository, times(0)).update(any());
}
@Override
public Single<AT[]> getAccountATs(BurstAddress accountId) {
return assign(() -> brsGrpc.getAccountATs(getAccountRequestFromId(accountId)))
.map(accountATs -> accountATs.getAtsList()
.stream()
.map(AT::new)
.toArray(AT[]::new));
}
public Single<User> logIn(final String username, final String password, final String serverUrl) {
return Single.create(emitter -> {
try {
emitter.onSuccess(loginInternal(username, password, serverUrl));
} catch (Throwable t) {
emitter.onError(t);
}
});
}
@Test
public void shouldNotCreate_roleNotFound() {
Membership membership = new Membership();
membership.setReferenceId(DOMAIN_ID);
membership.setReferenceType(ReferenceType.DOMAIN);
membership.setMemberId("user-id");
membership.setMemberType(MemberType.USER);
membership.setRoleId("role-id");
User user = new User();
user.setReferenceId(ORGANIZATION_ID);
user.setReferenceType(ReferenceType.ORGANIZATION);
Role role = new Role();
role.setId("role-id");
role.setReferenceId("master-domain");
role.setReferenceType(ReferenceType.DOMAIN);
role.setAssignableType(ReferenceType.DOMAIN);
when(userService.findById(ReferenceType.ORGANIZATION, ORGANIZATION_ID, membership.getMemberId())).thenReturn(Single.just(user));
when(roleService.findById(role.getId())).thenReturn(Maybe.empty());
when(membershipRepository.findByReferenceAndMember(membership.getReferenceType(), membership.getReferenceId(), membership.getMemberType(), membership.getMemberId())).thenReturn(Maybe.empty());
TestObserver testObserver = membershipService.addOrUpdate(ORGANIZATION_ID, membership).test();
testObserver.awaitTerminalEvent();
testObserver.assertNotComplete();
testObserver.assertError(RoleNotFoundException.class);
verify(membershipRepository, never()).create(any());
}
@Override
protected final Single<BatchInsertionResult> executeSql() {
LOG.debug("返回的是插入的条数");
List<Map> values = getValues();
if (values == null || values.isEmpty()) {
LOG.warn("没什么可以插入的数据,什么也不做");
return Single.just(new BatchInsertionResult().setCount(0));
}
return getSqlDriver().batchInsert(this);
}
/**
* Turns a non-void {@link Task} into an Rx {@link Single}. Null values are reported as an error
* result of {@link NullPointerException}. The provided supplier will be invoked only onSubscribe.
*/
public static <T> Single<T> toSingle(Supplier<Task<T>> task) {
return Single.create(
emitter ->
task.get()
.addOnSuccessListener(result -> onNullableSuccess(result, emitter))
.addOnFailureListener(emitter::onError));
}
@Test
public void testCreatingASingleFromNull() {
Single<Optional<Integer>> single = Uni.createFrom().item((Integer) null).convert()
.with(UniRxConverters.toSingle());
assertThat(single).isNotNull();
single
.test()
.assertValue(Optional.empty())
.assertComplete();
}
@Test
public void testDeleteRx2(TestContext testContext) throws Exception {
final Async async = testContext.async(2);
final JsonObject source = new JsonObject()
.put("user", source_user)
.put("message", source_message)
.put("obj", new JsonObject()
.put("array", new JsonArray()
.add("1")
.add("2")));
final UUID documentId = UUID.randomUUID();
final IndexOptions indexOptions = new IndexOptions().setId(documentId.toString());
rx2Service.index(index, type, source, indexOptions)
.flatMap(indexResponse -> {
return Single.create(emitter -> {
vertx.setTimer(2000l, event -> {
emitter.onSuccess(indexResponse);
});
});
})
.flatMap(indexResponse -> rx2Service.delete(index, type, documentId.toString(), new DeleteOptions().setRefresh(RefreshPolicy.IMMEDIATE)))
.subscribe(
deleteResponse -> {
assertDelete(testContext, deleteResponse, documentId);
async.complete();
},
error -> testContext.fail(error)
);
}
@Test
public void test() throws Exception {
Single.fromCallable(() -> { throw new RuntimeException(); })
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v -> { }, e -> { throw new RuntimeException(e); });
Thread.sleep(50000);
}
/**
* Import private key to keystore
*
* @param privateKey
* @param newPassword
* @return
*/
@Override
public Single<Wallet> importPrivateKey(String privateKey, String newPassword) {
return Single.fromCallable(() -> {
BigInteger key = new BigInteger(privateKey, PRIVATE_KEY_RADIX);
ECKeyPair keypair = ECKeyPair.create(key);
WalletFile wFile = org.web3j.crypto.Wallet.createLight(newPassword, keypair);
return objectMapper.writeValueAsString(wFile);
}).compose(upstream -> importKeystore(upstream.blockingGet(), newPassword, newPassword));
}
/**
* Initialize TrueTime
* See {@link #initializeNtp(String)} for details on working
*
* @return accurate NTP Date
*/
public Single<Date> initializeRx(String ntpPoolAddress) {
return isInitialized()
? Single.just(now())
: initializeNtp(ntpPoolAddress).map(new Function<long[], Date>() {
@Override
public Date apply(long[] longs) throws Exception {
return now();
}
});
}
@Override
Single<? extends SMSSubmission> convert(@NonNull String uid, String user, int submissionId) {
return Single.fromCallable(() -> {
DeleteSMSSubmission subm = new DeleteSMSSubmission();
subm.setSubmissionID(submissionId);
subm.setUserID(user);
subm.setEvent(uid);
return subm;
});
}