下面列出了io.reactivex.Maybe#error ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@SuppressWarnings("unchecked")
public Object invoke(final Object[] argv) {
try {
Publisher<Object> publisher = (Publisher<Object>)methodHandler.invoke(argv);
if(returnPublisherType == Flowable.class){
return fluxToFlowable((Flux<Object>) publisher);
} else if(returnPublisherType == Observable.class){
return fluxToObservable((Flux<Object>) publisher);
} else if(returnPublisherType == Single.class){
return monoToSingle((Mono<Object>) publisher);
} else if(returnPublisherType == Maybe.class){
return monoToMaybe((Mono<Object>) publisher);
} else {
throw new IllegalArgumentException("Unexpected returnPublisherType="+returnPublisherType.getClass());
}
} catch (Throwable throwable) {
if(returnPublisherType == Flowable.class){
return Flowable.error(throwable);
} else if(returnPublisherType == Observable.class){
return Observable.error(throwable);
} else if(returnPublisherType == Single.class){
return Single.error(throwable);
} else if(returnPublisherType == Maybe.class){
return Maybe.error(throwable);
} else {
throw new IllegalArgumentException("Unexpected returnPublisherType="+returnPublisherType.getClass());
}
}
}
private MaybeSource<? extends JsonObject> deleteIncompleteUser(JsonObject query, Throwable err) {
if (isIndexViolated(err)) {
return mongoClient
.rxRemoveDocument("user", query)
.flatMap(del -> Maybe.error(err));
} else {
return Maybe.error(err);
}
}
@Override
public Maybe<Client> assertClient(String assertionType, String assertion, String basePath) {
InvalidClientException unsupportedAssertionType = new InvalidClientException("Unknown or unsupported assertion_type");
if (assertionType == null || assertionType.isEmpty()) {
return Maybe.error(unsupportedAssertionType);
}
if (JWT_BEARER.equals(assertionType)) {
return this.validateJWT(assertion, basePath)
.flatMap(new Function<JWT, MaybeSource<Client>>() {
@Override
public MaybeSource<Client> apply(JWT jwt) throws Exception {
// Handle client_secret_key client authentication
if (JWSAlgorithm.Family.HMAC_SHA.contains(jwt.getHeader().getAlgorithm())) {
return validateSignatureWithHMAC(jwt);
} else {
// Handle private_key_jwt client authentication
return validateSignatureWithPublicKey(jwt);
}
}
});
}
return Maybe.error(unsupportedAssertionType);
}
/**
* This method will parse the JWT bearer then ensure that all requested claims are set as required
* <a href="https://tools.ietf.org/html/rfc7523#section-3">here</a>
* @param assertion jwt as string value.
* @return
*/
private Maybe<JWT> validateJWT(String assertion, String basePath) {
try {
JWT jwt = JWTParser.parse(assertion);
String iss = jwt.getJWTClaimsSet().getIssuer();
String sub = jwt.getJWTClaimsSet().getSubject();
List<String> aud = jwt.getJWTClaimsSet().getAudience();
Date exp = jwt.getJWTClaimsSet().getExpirationTime();
if (iss == null || iss.isEmpty() || sub == null || sub.isEmpty() || aud == null || aud.isEmpty() || exp == null) {
return Maybe.error(NOT_VALID);
}
if (exp.before(Date.from(Instant.now()))) {
return Maybe.error(new InvalidClientException("assertion has expired"));
}
//Check audience, here we expect to have absolute token endpoint path.
OpenIDProviderMetadata discovery = openIDDiscoveryService.getConfiguration(basePath);
if (discovery == null || discovery.getTokenEndpoint() == null) {
return Maybe.error(new ServerErrorException("Unable to retrieve discovery token endpoint."));
}
if (aud.stream().filter(discovery.getTokenEndpoint()::equals).count()==0) {
return Maybe.error(NOT_VALID);
}
return Maybe.just(jwt);
} catch (ParseException pe) {
return Maybe.error(NOT_VALID);
}
}
private Maybe<String> authenticate(Authentication authentication) {
// prepare body request parameters
final String authorizationCode = authentication.getContext().request().parameters().getFirst(configuration.getCodeParameter());
if (authorizationCode == null || authorizationCode.isEmpty()) {
LOGGER.debug("Authorization code is missing, skip authentication");
return Maybe.error(new BadCredentialsException("Missing authorization code"));
}
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair(CLIENT_ID, configuration.getClientId()));
urlParameters.add(new BasicNameValuePair(CLIENT_SECRET, configuration.getClientSecret()));
urlParameters.add(new BasicNameValuePair(REDIRECT_URI, (String) authentication.getContext().get(REDIRECT_URI)));
urlParameters.add(new BasicNameValuePair(CODE, authorizationCode));
String bodyRequest = URLEncodedUtils.format(urlParameters);
return client.postAbs(configuration.getAccessTokenUri())
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(bodyRequest.length()))
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED)
.rxSendBuffer(Buffer.buffer(bodyRequest))
.toMaybe()
.map(httpResponse -> {
if (httpResponse.statusCode() != 200) {
throw new BadCredentialsException(httpResponse.statusMessage());
}
Map<String, String> bodyResponse = URLEncodedUtils.format(httpResponse.bodyAsString());
return bodyResponse.get("access_token");
});
}
private Maybe<User> profile(Token token, Authentication authentication) {
// we only have the id_token, try to decode it and create the end-user
if (TokenTypeHint.ID_TOKEN.equals(token.getTypeHint())) {
return retrieveUserFromIdToken(token.getValue());
}
// if it's an access token but user ask for id token verification, try to decode it and create the end-user
if (TokenTypeHint.ACCESS_TOKEN.equals(token.getTypeHint()) && configuration.isUseIdTokenForUserInfo()) {
if (authentication.getContext().get(ID_TOKEN_PARAMETER) != null) {
String idToken = (String) authentication.getContext().get(ID_TOKEN_PARAMETER);
return retrieveUserFromIdToken(idToken);
} else {
// no suitable value to retrieve user
return Maybe.error(new BadCredentialsException("No suitable value to retrieve user information"));
}
}
// retrieve user claims from the UserInfo Endpoint
return client.getAbs(configuration.getUserProfileUri())
.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + token.getValue())
.rxSend()
.toMaybe()
.map(httpClientResponse -> {
if (httpClientResponse.statusCode() != 200) {
throw new BadCredentialsException(httpClientResponse.statusMessage());
}
return createUser(httpClientResponse.bodyAsJsonObject().getMap());
});
}
@Test
public void testToMaybeObserverFailure() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
RuntimeException cause = new RuntimeException();
Maybe<String> s = Maybe.error(cause);
s.subscribe(observer);
assertTrue(promise.future().failed());
assertSame(cause, promise.future().cause());
}
@Override
protected Maybe createInstanceFailingImmediately(RuntimeException e) {
return Maybe.error(e);
}
@Override
protected Maybe createInstanceFailingImmediately(RuntimeException e) {
return Maybe.error(e);
}
@GET
@Path("error")
public Maybe<String> error() {
return Maybe.error(new BadRequestException());
}