下面列出了io.reactivex.SingleTransformer#io.vertx.reactivex.core.buffer.Buffer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CompletionStage<String> writeFile() {
CompletableFuture<String> future = new CompletableFuture<>();
StringBuffer sb = new StringBuffer("id,name,surname");
sb.append("\n");
Observable.fromIterable(customerList)
.map(c -> c.getId() + "," + c.getName() + "," + c.getSurname() + "\n")
.subscribe(
data -> sb.append(data),
error -> System.err.println(error),
() -> vertx.fileSystem().writeFile(path, Buffer.buffer(sb.toString()), handler -> {
if (handler.succeeded()) {
future.complete("File written in "+path);
} else {
System.err.println("Error while writing in file: " + handler.cause().getMessage());
}
}));
return future;
}
@Override
public Completable send(Mail email) {
Single<Optional<Buffer>> htmlRender = email.renderHtml().map(buffer -> Optional.of(buffer)).toSingle(Optional.empty());
Single<Buffer> textRender = email.renderText();
return Single.zip(textRender, htmlRender, (text, html) -> {
MailMessage message = new MailMessage();
message.setFrom(email.from);
if(email.to != null)
message.setTo(Arrays.asList(email.to));
if(email.cc != null)
message.setCc(Arrays.asList(email.cc));
if(email.bcc != null)
message.setBcc(Arrays.asList(email.bcc));
message.setSubject(email.subject);
message.setText(text.toString());
if(html.isPresent())
message.setHtml(html.get().toString());
return mailClient.rxSendMail(message).ignoreElement();
}).flatMapCompletable(c -> c);
}
private void checkRequest(int expectedStatus, String expectedBody, String url, TestContext context, String authHeader) {
Async async = context.async();
HttpRequest<Buffer> request = webClient
.get(url);
if(authHeader != null)
request.putHeader(HttpHeaders.AUTHORIZATION, authHeader);
request.as(BodyCodec.string())
.rxSend()
.map(r -> {
context.assertEquals(expectedStatus, r.statusCode());
if(expectedBody != IGNORE)
context.assertEquals(expectedBody, r.body());
return r;
})
.doOnError(context::fail)
.subscribe(response -> async.complete());
}
@Test
public void testPost() {
int times = 5;
waitFor(times);
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080));
server.requestStream().handler(req -> req.bodyHandler(buff -> {
assertEquals("onetwothree", buff.toString());
req.response().end();
}));
try {
server.listen(ar -> {
client = WebClient.wrap(vertx.createHttpClient(new HttpClientOptions()));
Observable<Buffer> stream = Observable.just(Buffer.buffer("one"), Buffer.buffer("two"), Buffer.buffer("three"));
Single<HttpResponse<Buffer>> single = client
.post(8080, "localhost", "/the_uri")
.rxSendStream(stream);
for (int i = 0; i < times; i++) {
single.subscribe(resp -> complete(), this::fail);
}
});
await();
} finally {
server.close();
}
}
private void delegateWithCircuitBreaker(RoutingContext rc) {
HttpEndpoint.rxGetWebClient(discovery, svc -> svc.getName().equals("currency-3rdparty-service"))
.flatMap(client -> {
// TODO
// Use the circuit breaker (circuit) to call the service. Use the rxExecuteCommandWithFallback` method.
// This methods takes 2 parameters: the first one if a function taking a `Future` as parameter and
// needs to report the success or failure on this future. The second method is a function providing
// the fallback result. You must provide a JSON object as response. For the fallback use:
// new JsonObject()
// .put("amount", rc.getBodyAsJson().getDouble("amount"))
// .put("currency", "USD"))
// In the first function, use the given client, emit a POST request on / containing the incoming
// payload (rc.getBodyAsJson()). Extract the response payload as JSON (bodyAsJsonObject). Don't
// forget to subscribe (you can use subscribe(toObserver(fut)). You can have a look to the `delegate`
// method as example.
// -----
return Single.just(new JsonObject().put("amount", 0.0).put("currency", "N/A"));
})
// ----
.map(JsonObject::toBuffer)
.map(Buffer::new)
.subscribe(toObserver(rc));
}
/**
* Method to check the proxy requesting to convert the current portfolio to EUR.
*
* @param rc the routing context
*/
private void convertPortfolioToEuro(RoutingContext rc) {
EventBusService.getServiceProxy(discovery, svc -> svc.getName().equals("portfolio"), PortfolioService.class,
ar -> {
if (ar.failed()) {
rc.fail(ar.cause());
} else {
ar.result().evaluate(res -> {
if (res.failed()) {
rc.fail(res.cause());
} else {
JsonObject payload = new JsonObject().put("amount", res.result()).put("currency", "EUR");
rc.setBody(new Buffer(payload.toBuffer()));
delegateWithCircuitBreaker(rc);
}
});
}
});
}
@Test
public void testResponseMissingBody() throws Exception {
int times = 5;
waitFor(times);
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080));
server.requestStream().handler(req -> req.response().setStatusCode(403).end());
try {
server.listen(ar -> {
client = WebClient.wrap(vertx.createHttpClient(new HttpClientOptions()));
Single<HttpResponse<Buffer>> single = client
.get(8080, "localhost", "/the_uri")
.rxSend();
for (int i = 0; i < times; i++) {
single.subscribe(resp -> {
assertEquals(403, resp.statusCode());
assertNull(resp.body());
complete();
}, this::fail);
}
});
await();
} finally {
server.close();
}
}
@Test
public void shouldNotHandle_invalid_password() throws Exception {
router.route("/")
.handler(passwordPolicyRequestParseHandler)
.handler(rc -> rc.response().end());
when(passwordValidator.validate(anyString())).thenReturn(false);
testRequest(HttpMethod.POST, "/", req -> {
Buffer buffer = Buffer.buffer();
buffer.appendString("password=password");
req.headers().set("content-length", String.valueOf(buffer.length()));
req.headers().set("content-type", "application/x-www-form-urlencoded");
req.write(buffer);
}, resp -> {
String location = resp.headers().get("location");
assertNotNull(location);
assertTrue(location.contains("warning=invalid_password_value"));
},302, "Found", null);
}
@Test
public void shouldHandle() throws Exception {
router.route("/")
.handler(passwordPolicyRequestParseHandler)
.handler(rc -> rc.response().end());
when(passwordValidator.validate(anyString())).thenReturn(true);
testRequest(HttpMethod.POST, "/", req -> {
Buffer buffer = Buffer.buffer();
buffer.appendString("password=password");
req.headers().set("content-length", String.valueOf(buffer.length()));
req.headers().set("content-type", "application/x-www-form-urlencoded");
req.write(buffer);
},200, "OK", null);
}
@Test
public void testGetKeys() {
HttpRequest<Buffer> request = Mockito.mock(HttpRequest.class);
HttpResponse<Buffer> response = Mockito.mock(HttpResponse.class);
String bodyAsString = "{\"keys\":[{\"kty\": \"RSA\",\"use\": \"enc\",\"kid\": \"KID\",\"n\": \"modulus\",\"e\": \"exponent\"}]}";
when(webClient.getAbs(any())).thenReturn(request);
when(request.rxSend()).thenReturn(Single.just(response));
when(response.bodyAsString()).thenReturn(bodyAsString);
TestObserver testObserver = jwkService.getKeys(JWKS_URI).test();
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValue(jwkSet -> ((JWKSet)jwkSet).getKeys().get(0).getKid().equals("KID"));
}
@Test
public void testGetClientKeys_fromJksUriProperty() {
Client client = new Client();
client.setJwksUri(JWKS_URI);
HttpRequest<Buffer> request = Mockito.mock(HttpRequest.class);
HttpResponse<Buffer> response = Mockito.mock(HttpResponse.class);
String bodyAsString = "{\"keys\":[{\"kty\": \"RSA\",\"use\": \"enc\",\"kid\": \"KID\",\"n\": \"modulus\",\"e\": \"exponent\"}]}";
when(webClient.getAbs(any())).thenReturn(request);
when(request.rxSend()).thenReturn(Single.just(response));
when(response.bodyAsString()).thenReturn(bodyAsString);
TestObserver testObserver = jwkService.getKeys(client).test();
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValue(jwkSet -> ((JWKSet)jwkSet).getKeys().get(0).getKid().equals("KID"));
}
@Test
public void create_sectorIdentifierUriBadRequest() {
final String sectorUri = "https://sector/uri";
DynamicClientRegistrationRequest request = new DynamicClientRegistrationRequest();
request.setRedirectUris(Optional.empty());
request.setSectorIdentifierUri(Optional.of(sectorUri));//fail due to invalid url
HttpRequest<Buffer> httpRequest = Mockito.mock(HttpRequest.class);
HttpResponse httpResponse = Mockito.mock(HttpResponse.class);
when(webClient.getAbs(sectorUri)).thenReturn(httpRequest);
when(httpRequest.rxSend()).thenReturn(Single.just(httpResponse));
TestObserver<Client> testObserver = dcrService.create(request, BASE_PATH).test();
testObserver.assertError(InvalidClientMetadataException.class);
testObserver.assertNotComplete();
assertTrue("Should have only one exception", testObserver.errorCount()==1);
assertTrue("Unexpected start of error message", testObserver.errors().get(0).getMessage().startsWith("Unable to parse sector_identifier_uri"));
}
@Test
public void create_sectorIdentifierUri_invalidRedirectUri() {
final String sectorUri = "https://sector/uri";
DynamicClientRegistrationRequest request = new DynamicClientRegistrationRequest();
request.setRedirectUris(Optional.of(Arrays.asList("https://graviee.io/callback")));
request.setSectorIdentifierUri(Optional.of(sectorUri));//fail due to invalid url
HttpRequest<Buffer> httpRequest = Mockito.mock(HttpRequest.class);
HttpResponse httpResponse = Mockito.mock(HttpResponse.class);
when(webClient.getAbs(sectorUri)).thenReturn(httpRequest);
when(httpRequest.rxSend()).thenReturn(Single.just(httpResponse));
when(httpResponse.bodyAsString()).thenReturn("[\"https://not/same/redirect/uri\"]");
TestObserver<Client> testObserver = dcrService.create(request, BASE_PATH).test();
testObserver.assertError(InvalidRedirectUriException.class);
testObserver.assertNotComplete();
}
@Test
public void create_sectorIdentifierUri_validRedirectUri() {
final String redirectUri = "https://graviee.io/callback";
final String sectorUri = "https://sector/uri";
DynamicClientRegistrationRequest request = new DynamicClientRegistrationRequest();
request.setRedirectUris(Optional.of(Arrays.asList(redirectUri)));
request.setSectorIdentifierUri(Optional.of(sectorUri));
HttpRequest<Buffer> httpRequest = Mockito.mock(HttpRequest.class);
HttpResponse httpResponse = Mockito.mock(HttpResponse.class);
when(webClient.getAbs(sectorUri)).thenReturn(httpRequest);
when(httpRequest.rxSend()).thenReturn(Single.just(httpResponse));
when(httpResponse.bodyAsString()).thenReturn("[\""+redirectUri+"\"]");
TestObserver<Client> testObserver = dcrService.create(request, BASE_PATH).test();
testObserver.assertNoErrors();
testObserver.assertComplete();
}
@Test
public void success_simpleRequest() {
PermissionTicket success = new PermissionTicket().setId("success");
final String simpleRequest = "{\"resource_id\":\"{{set_one}}\", \"resource_scopes\":[\"profile:read\"]}";
when(context.getBody()).thenReturn(Buffer.buffer(simpleRequest));
when(context.response()).thenReturn(response);
when(response.putHeader(anyString(),anyString())).thenReturn(response);
when(response.setStatusCode(anyInt())).thenReturn(response);
when(permissionTicketService.create(anyList(), eq(DOMAIN_ID), eq(CLIENT_ID))).thenReturn(Single.just(success));
endpoint.handle(context);
verify(response, times(1)).putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
verify(context.response(), times(1)).setStatusCode(intCaptor.capture());
verify(context.response(), times(1)).end(strCaptor.capture());
Assert.assertEquals("Expecting 201 creation status",intCaptor.getValue().intValue(),201);
Assert.assertTrue("Expect success id", strCaptor.getValue().contains("success"));
}
@Test
public void success_extendedRequest() {
PermissionTicket success = new PermissionTicket().setId("success");
final String extendedRequest = "[{\"resource_id\":\"{{set_one}}\", \"resource_scopes\":[\"profile:read\"]}, {\"resource_id\":\"{{set_two}}\",\"resource_scopes\":[\"avatar:write\"]}]";
when(context.getBody()).thenReturn(Buffer.buffer(extendedRequest));
when(context.response()).thenReturn(response);
when(response.putHeader(anyString(),anyString())).thenReturn(response);
when(response.setStatusCode(anyInt())).thenReturn(response);
when(permissionTicketService.create(anyList(), eq(DOMAIN_ID), eq(CLIENT_ID))).thenReturn(Single.just(success));
endpoint.handle(context);
verify(response, times(1)).putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
verify(context.response(), times(1)).setStatusCode(intCaptor.capture());
verify(context.response(), times(1)).end(strCaptor.capture());
Assert.assertEquals("Expecting 201 creation status",intCaptor.getValue().intValue(),201);
Assert.assertTrue("Expect success id", strCaptor.getValue().contains("success"));
}
public void delayFlowable(HttpServer server) {
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
// Stop receiving buffers
request.pause();
checkAuth(res -> {
// Now we can receive buffers again
request.resume();
if (res.succeeded()) {
Flowable<Buffer> flowable = request.toFlowable();
flowable.subscribe(buff -> {
// Get buffers
});
}
});
}
});
}
public void writeStreamSubscriberAdapterCallbacks(Flowable<Buffer> flowable, HttpServerResponse response) {
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
// log error
}
});
subscriber.onWriteStreamError(throwable -> {
// log error
});
subscriber.onWriteStreamEnd(() -> {
// log end of transaction to audit system...
});
flowable.subscribe(subscriber);
}
public static Flowable<Character> villains() {
return fs().rxReadFile("src/main/resources/characters.json")
.map(Buffer::toJsonArray)
.flatMapPublisher(Flowable::fromIterable)
.cast(JsonObject.class)
.map(j -> j.mapTo(Character.class))
.filter(Character::isVillain);
}
public static void main(String[] args) throws IOException {
Document doc = Jsoup.connect("https://www.superherodb.com/characters/").get();
System.out.println("Scraping " + doc.title());
Elements links = doc.select("a[title]");
Map<String, String> names = new LinkedHashMap<>();
links.forEach(element -> {
String name = element.attr("title");
String href = element.attr("href");
if (name != null && !name.trim().isEmpty() && ! isExcluded(name)) {
names.put(name, href);
}
});
System.out.println(names.size() + " superheros and villains found");
Vertx vertx = Vertx.vertx();
WebClient client = WebClient.create(vertx);
AtomicInteger counter = new AtomicInteger();
Flowable.fromIterable(names.entrySet())
.flatMapSingle(entry -> scrap(client, entry.getKey(), "https://www.superherodb.com" + entry.getValue()))
.doOnNext(superStuff -> System.out.println("Retrieved " + superStuff + " (" + counter.incrementAndGet() + " / " +
names.size() + ")"))
.toList()
.flatMapCompletable(list -> vertx.fileSystem()
.rxWriteFile("src/main/resources/characters.json", new Buffer(Json.encodeToBuffer(list)))
)
.subscribe(
() -> System.out.println("Written " + names.size() + " super heroes and villains"),
Throwable::printStackTrace
);
}
public static void main(String[] args) {
fs()
.rxWriteFile(
"hello.txt", Buffer.buffer("hello")
)
.subscribe(
() -> System.out.println("File written"),
Throwable::printStackTrace
);
}
private Single<String> get(Vertx vertx, URI uri){
WebClient client = WebClient.create(vertx);
Single<HttpResponse<Buffer>> responseHandler =
client.get(uri.getPort(), uri.getHost(), uri.getPath()).rxSend();
return responseHandler.map(response -> response.body().toString())
.doAfterTerminate(() -> client.close());
}
@Override
public void writeTo(Buffer buffer, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream)
throws IOException, WebApplicationException {
for(int i=0;i<buffer.length();i++)
entityStream.write(buffer.getByte(i));
}
@Override
public Completable send(Mail email) {
Single<Optional<Buffer>> htmlRender = email.renderHtml().map(buffer -> Optional.of(buffer)).toSingle(Optional.empty());
Single<Buffer> textRender = email.renderText();
return Single.zip(textRender, htmlRender, (text, html) -> {
send(email, text, html.orElse(null));
return Completable.complete();
}).flatMapCompletable(c -> c);
}
public Single<Buffer> renderText() {
// FIXME: cache variants?
return loadVariants()
.flatMap(variants -> {
System.err.println("Got variants for txt");
String template = variants.getVariantTemplate(MediaType.TEXT_PLAIN_TYPE);
TemplateRenderer renderer = AppGlobals.get().getTemplateRenderer(template);
if(renderer == null)
throw new RuntimeException("Failed to find template renderer for template "+template);
return renderer.render(template, variables);
}).map(response -> (Buffer)response.getEntity());
}
private Handler<Buffer> handleConnection(ServerWebSocket serverWebSocket) {
return data -> {
try {
Sc2Api.Request request = Sc2Api.Request.parseFrom(data.getDelegate().getBytes());
log.debug("Received request {}", request);
Sc2Api.Response response = requestHandler.handle(
request);
sendResponse(serverWebSocket, response);
} catch (Exception e) {
sendResponse(serverWebSocket, Sc2Api.Response.newBuilder().addError(e.getMessage()).build());
}
};
}
/**
* Utility method to report the completion/failure from a Single to a Routing Context.
*
* @param rc the routing context
* @return the single observer to pass to {@link Single#subscribe()}
*/
public static SingleObserver<Buffer> toObserver(RoutingContext rc) {
return new SingleObserver<Buffer>() {
public void onSubscribe(@NonNull Disposable d) {
}
public void onSuccess(@NonNull Buffer payload) {
rc.response().end(payload);
}
public void onError(Throwable error) {
rc.fail(error);
}
};
}
private void delegateWithCircuitBreaker(RoutingContext rc) {
HttpEndpoint.rxGetWebClient(discovery, svc -> svc.getName().equals("currency-3rdparty-service"))
.flatMap(client ->
// TODO
// Use the circuit breaker (circuit) to call the service. Use the rxExecuteCommandWithFallback` method.
// This methods takes 2 parameters: the first one if a function taking a `Future` as parameter and
// needs to report the success or failure on this future. The second method is a function providing
// the fallback result. You must provide a JSON object as response. For the fallback use:
// new JsonObject()
// .put("amount", rc.getBodyAsJson().getDouble("amount"))
// .put("currency", "USD"))
// In the first function, use the given client, emit a POST request on / containing the incoming
// payload (rc.getBodyAsJson()). Extract the response payload as JSON (bodyAsJsonObject). Don't
// forget to subscribe (you can use subscribe(toObserver(fut)). You can have a look to the `delegate`
// method as example.
// -----
circuit.rxExecuteCommandWithFallback(
fut ->
client.post("/").rxSendJsonObject(rc.getBodyAsJson())
.map(HttpResponse::bodyAsJsonObject)
.subscribe(toObserver(fut)),
err -> new JsonObject()
.put("amount", rc.getBodyAsJson().getDouble("amount"))
.put("currency", "USD")))
// ----
.map(JsonObject::toBuffer)
.map(Buffer::new)
.subscribe(toObserver(rc));
}
private static Function<HttpClientResponse, Publisher<? extends Buffer>> toBufferFlowable() {
return response -> response
.toObservable()
.reduce(
Buffer.buffer(),
Buffer::appendBuffer).toFlowable();
}
protected void testRequestBuffer(HttpClient client, HttpMethod method, int port, String path, Consumer<HttpClientRequest> requestAction, Consumer<HttpClientResponse> responseAction,
int statusCode, String statusMessage,
Buffer responseBodyBuffer, boolean normalizeLineEndings) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
HttpClientRequest req = client.request(method, port, "localhost", path, resp -> {
assertEquals(statusCode, resp.statusCode());
assertEquals(statusMessage, resp.statusMessage());
if (responseAction != null) {
responseAction.accept(resp);
}
if (responseBodyBuffer == null) {
latch.countDown();
} else {
resp.bodyHandler(buff -> {
if (normalizeLineEndings) {
buff = normalizeLineEndingsFor(buff);
}
assertEquals(responseBodyBuffer, buff);
latch.countDown();
});
}
});
if (requestAction != null) {
requestAction.accept(req);
}
req.end();
awaitLatch(latch);
}