下面列出了org.springframework.core.io.buffer.NettyDataBuffer#com.linecorp.armeria.common.HttpData 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public HttpResponse convertResponse(ServiceRequestContext ctx, ResponseHeaders headers,
@Nullable Object resObj,
HttpHeaders trailingHeaders) throws Exception {
try {
final ResponseHeadersBuilder builder = headers.toBuilder();
if (builder.contentType() == null) {
builder.contentType(MediaType.JSON_UTF_8);
}
final JsonNode jsonNode = Jackson.valueToTree(resObj);
if (builder.get(HttpHeaderNames.LOCATION) == null) {
final String url = jsonNode.get("url").asText();
// Remove the url field and send it with the LOCATION header.
((ObjectNode) jsonNode).remove("url");
builder.add(HttpHeaderNames.LOCATION, url);
}
return HttpResponse.of(builder.build(), HttpData.wrap(Jackson.writeValueAsBytes(jsonNode)),
trailingHeaders);
} catch (JsonProcessingException e) {
logger.debug("Failed to convert a response:", e);
return HttpApiUtil.newResponse(ctx, HttpStatus.INTERNAL_SERVER_ERROR, e);
}
}
@ParameterizedTest
@MethodSource("arguments")
void test(String secret, String projectName, ProjectRole role, String repoName,
Set<Permission> permission, HttpStatus expectedFailureStatus) {
final WebClient client = WebClient.builder(server.httpUri())
.addHttpHeader(HttpHeaderNames.AUTHORIZATION, "Bearer " + secret)
.build();
AggregatedHttpResponse response;
response = client.get("/projects/" + projectName).aggregate().join();
assertThat(response.status())
.isEqualTo(role == ProjectRole.OWNER || role == ProjectRole.MEMBER ? HttpStatus.OK
: expectedFailureStatus);
response = client.post("/projects/" + projectName + "/repos/" + repoName, HttpData.empty())
.aggregate().join();
assertThat(response.status()).isEqualTo(permission.contains(Permission.WRITE) ? HttpStatus.OK
: expectedFailureStatus);
response = client.get("/projects/" + projectName + "/repos/" + repoName)
.aggregate().join();
assertThat(response.status()).isEqualTo(permission.isEmpty() ? expectedFailureStatus
: HttpStatus.OK);
}
@Test
void streaming_HttpData() throws Exception {
final List<HttpData> contents = ImmutableList.of(HttpData.ofUtf8("foo"),
HttpData.ofUtf8("bar"),
HttpData.ofUtf8("baz"));
for (final Object result : ImmutableList.of(Flux.fromIterable(contents),
contents.stream())) {
StepVerifier.create(from(result))
.expectNext(OCTET_STREAM_HEADERS)
.expectNext(contents.get(0))
.expectNext(contents.get(1))
.expectNext(contents.get(2))
.expectComplete()
.verify();
}
StepVerifier.create(from(contents.get(0)))
.expectNext(OCTET_STREAM_HEADERS.toBuilder()
.addInt(HttpHeaderNames.CONTENT_LENGTH, 3)
.build())
.expectNext(contents.get(0))
.expectComplete()
.verify();
}
@Test
public void metaAnnotations() {
final AggregatedHttpResponse msg =
WebClient.of(rule.httpUri())
.execute(RequestHeaders.of(HttpMethod.POST, "/hello",
HttpHeaderNames.CONTENT_TYPE,
MediaType.PLAIN_TEXT_UTF_8,
HttpHeaderNames.ACCEPT, "text/*"),
HttpData.ofUtf8("Armeria"))
.aggregate().join();
assertThat(msg.status()).isEqualTo(HttpStatus.CREATED);
assertThat(msg.contentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(msg.headers().get(HttpHeaderNames.of("x-foo"))).isEqualTo("foo");
assertThat(msg.headers().get(HttpHeaderNames.of("x-bar"))).isEqualTo("bar");
assertThat(msg.contentUtf8())
.isEqualTo("Hello, Armeria (decorated-1) (decorated-2) (decorated-3)!");
assertThat(msg.trailers().get(HttpHeaderNames.of("x-baz"))).isEqualTo("baz");
assertThat(msg.trailers().get(HttpHeaderNames.of("x-qux"))).isEqualTo("qux");
}
@Test
public void cancel() throws Exception {
when(armeriaCall.tryFinish()).thenReturn(false);
when(armeriaCall.isCanceled()).thenReturn(false, false, true);
final ManualMockCallback callback = new ManualMockCallback();
final BlockingCallSubscriber subscriber = new BlockingCallSubscriber(
armeriaCall, callback, new Request.Builder().url("http://foo.com").build());
subscriber.onSubscribe(subscription);
subscriber.onNext(ResponseHeaders.of(200));
subscriber.onNext(HttpData.ofUtf8("{\"name\":\"foo\"}"));
subscriber.onComplete();
verify(subscription).request(Long.MAX_VALUE);
assertThat(callback.callbackCallingCount).isEqualTo(1);
assertThat(callback.exception.getMessage()).isEqualTo("cancelled");
}
@Test
public void dataIsIgnoredAfterTrailers() throws Exception {
when(armeriaCall.tryFinish()).thenReturn(true);
final ManualMockCallback callback = new ManualMockCallback();
final BlockingCallSubscriber subscriber = new BlockingCallSubscriber(
armeriaCall, callback, new Request.Builder().url("http://bar.com").build());
subscriber.onSubscribe(subscription);
subscriber.onNext(ResponseHeaders.of(100));
subscriber.onNext(ResponseHeaders.of(200));
subscriber.onNext(HttpHeaders.of(HttpHeaderNames.of("foo"), "bar")); // Trailers.
subscriber.onNext(HttpData.ofUtf8("baz")); // Ignored.
subscriber.onComplete();
verify(subscription).request(Long.MAX_VALUE);
assertThat(callback.callbackCallingCount).isEqualTo(1);
assertThat(callback.response.header("foo")).isNull(); // Currently, there's no way to retrieve trailers.
assertThat(callback.response.body().string()).isEmpty();
}
@Test
public void readBodyStream() {
final ResponseHeaders httpHeaders = ResponseHeaders.of(HttpStatus.OK);
final HttpResponse httpResponse = HttpResponse.of(
Flux.concat(Mono.just(httpHeaders),
Flux.just("a", "b", "c", "d", "e")
.map(HttpData::ofUtf8)));
final ArmeriaClientHttpResponse response =
response(new ArmeriaHttpClientResponseSubscriber(httpResponse), httpHeaders);
assertThat(response.getStatusCode()).isEqualTo(org.springframework.http.HttpStatus.OK);
assertThat(httpResponse.whenComplete().isDone()).isFalse();
final Flux<String> body = response.getBody().map(TestUtil::bufferToString);
StepVerifier.create(body, 1)
.expectNext("a").thenRequest(1)
.expectNext("b").thenRequest(1)
.expectNext("c").thenRequest(1)
.expectNext("d").thenRequest(1)
.expectNext("e").thenRequest(1)
.expectComplete()
.verify();
await().until(() -> httpResponse.whenComplete().isDone());
}
@Test
public void dataIsIgnoredAfterTrailers() throws Exception {
when(armeriaCall.tryFinish()).thenReturn(true);
final ManualMockCallback callback = new ManualMockCallback();
final StreamingCallSubscriber subscriber = new StreamingCallSubscriber(
armeriaCall, callback, new Request.Builder().url("http://bar.com").build(),
MoreExecutors.directExecutor());
subscriber.onSubscribe(subscription);
subscriber.onNext(ResponseHeaders.of(100));
subscriber.onNext(ResponseHeaders.of(200));
subscriber.onNext(HttpHeaders.of(HttpHeaderNames.of("foo"), "bar")); // Trailers.
subscriber.onNext(HttpData.ofUtf8("baz")); // Ignored.
subscriber.onComplete();
verify(subscription, times(4)).request(1L);
await().untilAsserted(() -> assertThat(callback.callbackCallingCount).isEqualTo(1));
// TODO(minwoox) Remove after we can retrieve trailers.
TimeUnit.SECONDS.sleep(2);
assertThat(callback.response.header("foo")).isNull(); // Currently, there's no way to retrieve trailers.
assertThat(callback.response.body().string()).isEmpty();
}
@Test
void deframe_tooLargeCompressed() throws Exception {
// Simple repeated character compresses below the frame threshold but uncompresses above it.
final SimpleRequest request =
SimpleRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(
Strings.repeat("a", 1024))))
.build();
final byte[] frame = GrpcTestUtil.compressedFrame(Unpooled.wrappedBuffer(request.toByteArray()));
assertThat(frame.length).isLessThan(1024);
deframer.request(1);
deframer.deframe(HttpData.wrap(frame), false);
final ArgumentCaptor<DeframedMessage> messageCaptor = ArgumentCaptor.forClass(DeframedMessage.class);
verify(listener).messageRead(messageCaptor.capture());
verifyNoMoreInteractions(listener);
try (InputStream stream = messageCaptor.getValue().stream()) {
assertThatThrownBy(() -> ByteStreams.toByteArray(stream))
.isInstanceOf(ArmeriaStatusException.class);
}
}
@Test
public void usingDefaultDataBufferFactory_HttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new DefaultDataBufferFactory());
final HttpData httpData1 = HttpData.ofUtf8("abc");
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(DefaultDataBuffer.class);
assertThat(buffer.asByteBuffer()).isEqualTo(ByteBuffer.wrap("abc".getBytes()));
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
assertThat(ByteBufUtil.getBytes(((PooledHttpData) httpData2).content())).isEqualTo("abc".getBytes());
}
@ParameterizedTest
@ArgumentsSource(SchemesProvider.class)
void shouldGetHelloFromRouter(String scheme) throws Exception {
final WebClient client = WebClient.builder(scheme + "://example.com:" + port)
.factory(clientFactory)
.build();
final AggregatedHttpResponse res = client.get("/route").aggregate().join();
assertThat(res.contentUtf8()).isEqualTo("route");
final AggregatedHttpResponse res2 =
client.execute(RequestHeaders.of(HttpMethod.POST, "/route2",
HttpHeaderNames.CONTENT_TYPE, JSON_UTF_8),
HttpData.wrap("{\"a\":1}".getBytes())).aggregate().join();
assertThatJson(res2.contentUtf8()).isArray()
.ofLength(1)
.thatContains("route");
}
/**
* Invoked when the SAML authentication process is finished and a user is authenticated. You can get
* information about the authenticated user from the {@link Response}, especially his or her login name.
* In this example, an email address is used as a login name. The login name is transferred to a web
* browser via {@code Set-Cookie} header.
*/
@Override
public HttpResponse loginSucceeded(ServiceRequestContext ctx, AggregatedHttpRequest req,
MessageContext<Response> message, @Nullable String sessionIndex,
@Nullable String relayState) {
final NameID nameId = getNameId(message.getMessage(), SamlNameIdFormat.EMAIL);
final String username = nameId != null ? nameId.getValue() : null;
if (username == null) {
return HttpResponse.of(HttpStatus.UNAUTHORIZED, MediaType.HTML_UTF_8,
"<html><body>Username is not found.</body></html>");
}
logger.info("{} user '{}' has been logged in.", ctx, username);
final Cookie cookie = Cookie.builder("username", username)
.httpOnly(true)
.domain("localhost")
.maxAge(60)
.path("/")
.build();
return HttpResponse.of(
ResponseHeaders.of(HttpStatus.OK,
HttpHeaderNames.CONTENT_TYPE, MediaType.HTML_UTF_8,
HttpHeaderNames.SET_COOKIE, cookie.toSetCookieHeader(false)),
HttpData.ofUtf8("<html><body onLoad=\"window.location.href='/welcome'\"></body></html>"));
}
@Test
public void withDataStringifier() throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final Function<Object, String> stringifier = o -> {
try {
return mapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
final HttpResponse response = doConvert(
Flux.just(ServerSentEvent.ofData(stringifier.apply(ImmutableList.of("foo", "bar"))),
ServerSentEvent.ofData(stringifier.apply(ImmutableMap.of("foo", "bar")))));
StepVerifier.create(response)
.expectNext(EVENT_STREAM_HEADER)
.expectNext(HttpData.ofUtf8("data:[\"foo\",\"bar\"]\n\n"))
.expectNext(HttpData.ofUtf8("data:{\"foo\":\"bar\"}\n\n"))
.expectComplete()
.verify();
}
public int doWrite(ByteChunk chunk) {
final int start = chunk.getStart();
final int end = chunk.getEnd();
final int length = end - start;
if (length == 0) {
return 0;
}
// NB: We make a copy because Tomcat reuses the underlying byte array of 'chunk'.
final byte[] content = Arrays.copyOfRange(chunk.getBuffer(), start, end);
data.add(HttpData.wrap(content));
bytesWritten += length;
return length;
}
private ChannelFuture doWriteSplitData(int id, HttpData data, boolean endStream) {
try {
int offset = 0;
int remaining = data.length();
ChannelFuture lastFuture;
for (;;) {
// Ensure an HttpContent does not exceed the maximum length of a cleartext TLS record.
final int chunkSize = Math.min(MAX_TLS_DATA_LENGTH, remaining);
lastFuture = write(id, new DefaultHttpContent(dataChunk(data, offset, chunkSize)), false);
remaining -= chunkSize;
if (remaining == 0) {
break;
}
offset += chunkSize;
}
if (endStream) {
lastFuture = write(id, LastHttpContent.EMPTY_LAST_CONTENT, true);
}
ch.flush();
return lastFuture;
} finally {
ReferenceCountUtil.safeRelease(data);
}
}
@Override
public final ChannelFuture doWriteData(int id, int streamId, HttpData data, boolean endStream) {
if (isStreamPresentAndWritable(streamId)) {
final KeepAliveHandler keepAliveHandler = keepAliveHandler();
if (keepAliveHandler != null) {
keepAliveHandler.onReadOrWrite();
}
// Write to an existing stream.
return encoder.writeData(ctx, streamId, toByteBuf(data), 0, endStream, ctx.newPromise());
}
if (encoder.connection().local().mayHaveCreatedStream(streamId)) {
// Can't write to an outdated (closed) stream.
ReferenceCountUtil.safeRelease(data);
return data.isEmpty() ? ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
: newFailedFuture(ClosedStreamException.get());
}
// Cannot start a new stream with a DATA frame. It must start with a HEADERS frame.
ReferenceCountUtil.safeRelease(data);
return newFailedFuture(new IllegalStateException(
"Trying to write data to the closed stream " + streamId +
" or start a new stream with a DATA frame"));
}
@Override
public HttpResponse serve(ServiceRequestContext ctx, AggregatedHttpRequest req,
String defaultHostname, SamlPortConfig portConfig) {
final HttpData metadata = metadataMap.computeIfAbsent(defaultHostname, h -> {
try {
final Element element =
SamlMessageUtil.serialize(buildMetadataEntityDescriptorElement(h, portConfig));
final HttpData newMetadata = HttpData.ofUtf8(nodeToString(element));
logger.debug("SAML service provider metadata has been prepared for: {}.", h);
return newMetadata;
} catch (Throwable cause) {
logger.warn("{} Unexpected metadata request.", ctx, cause);
return HttpData.empty();
}
});
if (metadata != HttpData.empty()) {
return HttpResponse.of(HTTP_HEADERS, metadata);
} else {
return HttpResponse.of(HttpStatus.NOT_FOUND);
}
}
@Test
void updateUsingPatch() {
final WebClient client = WebClient.of(server.httpUri());
// Make unhealthy.
final AggregatedHttpResponse res1 = client.execute(
RequestHeaders.of(HttpMethod.PATCH, "/hc_updatable"),
"[{\"op\":\"replace\",\"path\":\"/healthy\",\"value\":false}]").aggregate().join();
assertThat(res1).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.SERVICE_UNAVAILABLE,
HttpHeaderNames.CONTENT_TYPE, MediaType.JSON_UTF_8,
"armeria-lphc", "60, 5"),
HttpData.ofUtf8("{\"healthy\":false}")));
// Make healthy.
final AggregatedHttpResponse res2 = client.execute(
RequestHeaders.of(HttpMethod.PATCH, "/hc_updatable"),
"[{\"op\":\"replace\",\"path\":\"/healthy\",\"value\":true}]").aggregate().join();
assertThat(res2).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.OK,
HttpHeaderNames.CONTENT_TYPE, MediaType.JSON_UTF_8,
"armeria-lphc", "60, 5"),
HttpData.ofUtf8("{\"healthy\":true}")));
}
@Test
void missingMethod() throws Exception {
final HttpRequest req = HttpRequest.of(
RequestHeaders.of(HttpMethod.POST, "/grpc.testing.TestService/FooCall",
HttpHeaderNames.CONTENT_TYPE, "application/grpc+proto"));
final RoutingResult routingResult = RoutingResult.builder()
.path("/grpc.testing.TestService/FooCall")
.build();
final ServiceRequestContext ctx = ServiceRequestContext.builder(req)
.routingResult(routingResult)
.build();
final HttpResponse response = grpcService.doPost(ctx, PooledHttpRequest.of(req));
assertThat(response.aggregate().get()).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.builder(HttpStatus.OK)
.endOfStream(true)
.add(HttpHeaderNames.CONTENT_TYPE, "application/grpc+proto")
.addInt("grpc-status", 12)
.add("grpc-message", "Method not found: grpc.testing.TestService/FooCall")
.addInt(HttpHeaderNames.CONTENT_LENGTH, 0)
.build(),
HttpData.empty()));
}
@Override
public void onNext(HttpObject obj) {
if (obj instanceof HttpData) {
numReceivedBytes += ((HttpData) obj).length();
}
if (numReceivedBytes >= (numReceivedChunks + 1L) * STREAMING_CONTENT_CHUNK_LENGTH) {
numReceivedChunks++;
if (slow) {
// Add 1 second delay for every chunk received.
executor.schedule(() -> subscription.request(1), 1, TimeUnit.SECONDS);
} else {
subscription.request(1);
}
logger.debug("{} bytes received", numReceivedBytes);
} else {
subscription.request(1);
}
}
protected void onData(HttpData data) {
boolean added = false;
try {
if (future.isDone()) {
return;
}
final int dataLength = data.length();
if (dataLength > 0) {
final int allowedMaxDataLength = Integer.MAX_VALUE - contentLength;
if (dataLength > allowedMaxDataLength) {
subscription.cancel();
fail(new IllegalStateException("content length greater than Integer.MAX_VALUE"));
return;
}
contentList.add(data);
contentLength += dataLength;
added = true;
}
} finally {
if (!added) {
ReferenceCountUtil.safeRelease(data);
}
}
}
@Test
void informationalHeadersHeadersDataAndTrailers() throws Exception {
final DecodedHttpResponse res = new DecodedHttpResponse(CommonPools.workerGroup().next());
final HttpResponseWrapper wrapper = httpResponseWrapper(res);
assertThat(wrapper.tryWrite(ResponseHeaders.of(100))).isTrue();
assertThat(wrapper.tryWrite(HttpHeaders.of(HttpHeaderNames.of("a"), "b"))).isTrue();
assertThat(wrapper.tryWrite(
ResponseHeaders.of(HttpStatus.OK, HttpHeaderNames.CONTENT_LENGTH, "foo".length()))).isTrue();
assertThat(wrapper.tryWrite(HttpData.ofUtf8("foo"))).isTrue();
assertThat(wrapper.tryWrite(HttpHeaders.of(HttpHeaderNames.of("bar"), "baz"))).isTrue();
wrapper.close();
StepVerifier.create(res)
.expectNext(ResponseHeaders.of(100))
.expectNext(HttpHeaders.of(HttpHeaderNames.of("a"), "b"))
.expectNext(ResponseHeaders.of(HttpStatus.OK, HttpHeaderNames.CONTENT_LENGTH, 3))
.expectNext(HttpData.ofUtf8("foo"))
.expectNext(HttpHeaders.of(HttpHeaderNames.of("bar"), "baz"))
.expectComplete()
.verify();
}
@Test
void splitTrailersAfterDataIsIgnored() throws Exception {
final DecodedHttpResponse res = new DecodedHttpResponse(CommonPools.workerGroup().next());
final HttpResponseWrapper wrapper = httpResponseWrapper(res);
assertThat(wrapper.tryWrite(
ResponseHeaders.of(HttpStatus.OK, HttpHeaderNames.CONTENT_LENGTH, "foo".length()))).isTrue();
assertThat(wrapper.tryWrite(HttpData.ofUtf8("foo"))).isTrue();
assertThat(wrapper.tryWrite(HttpHeaders.of(HttpHeaderNames.of("bar"), "baz"))).isTrue();
assertThat(wrapper.tryWrite(HttpHeaders.of(HttpHeaderNames.of("qux"), "quux"))).isFalse();
wrapper.close();
StepVerifier.create(res)
.expectNext(ResponseHeaders.of(HttpStatus.OK, HttpHeaderNames.CONTENT_LENGTH, 3))
.expectNext(HttpData.ofUtf8("foo"))
.expectNext(HttpHeaders.of(HttpHeaderNames.of("bar"), "baz"))
.expectComplete()
.verify();
}
@Test
void longPollingDisabled() throws Exception {
final WebClient client = WebClient.of(server.httpUri());
final CompletableFuture<AggregatedHttpResponse> f = client.execute(
RequestHeaders.of(HttpMethod.GET, "/hc_long_polling_disabled",
HttpHeaderNames.PREFER, "wait=60",
HttpHeaderNames.IF_NONE_MATCH, "\"healthy\"")).aggregate();
assertThat(f.get(10, TimeUnit.SECONDS)).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.OK,
HttpHeaderNames.CONTENT_TYPE, MediaType.JSON_UTF_8,
"armeria-lphc", "0, 0"),
HttpData.ofUtf8("{\"healthy\":true}")));
}
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.service("/large-stream", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception {
return HttpResponse.of(s -> s.onSubscribe(new Subscription() {
int count;
@Override
public void request(long n) {
for (int i = 0; i < n; i++) {
if (count == 0) {
s.onNext(ResponseHeaders.of(HttpStatus.OK));
} else {
s.onNext(HttpData.wrap(new byte[1024]));
}
}
count += n;
// 10MB
if (count > 1024 * 10) {
s.onComplete();
}
}
@Override
public void cancel() {
}
}));
}
});
sb.requestTimeout(Duration.of(30, ChronoUnit.SECONDS));
}
/**
* Makes sure a large file is not cached.
*/
@Test
public void largeFile() throws Exception {
final HttpFileAttributes attrs = new HttpFileAttributes(5, 0);
final ResponseHeaders headers = ResponseHeaders.of(200);
final HttpResponse res = HttpResponse.of("large");
final AggregatedHttpFile aggregated = HttpFile.of(HttpData.ofUtf8("large"), 0);
final AggregatedHttpFile aggregatedWithPooledObjs = HttpFile.of(HttpData.ofUtf8("large"), 0);
final HttpFile uncached = mock(HttpFile.class);
when(uncached.readAttributes(executor)).thenReturn(UnmodifiableFuture.completedFuture(attrs));
when(uncached.readHeaders(executor)).thenReturn(UnmodifiableFuture.completedFuture(headers));
when(uncached.read(any(), any())).thenReturn(UnmodifiableFuture.completedFuture(res));
when(uncached.aggregate(any())).thenReturn(UnmodifiableFuture.completedFuture(aggregated));
when(uncached.aggregateWithPooledObjects(any(), any()))
.thenReturn(UnmodifiableFuture.completedFuture(aggregatedWithPooledObjs));
final HttpFile cached = HttpFile.ofCached(uncached, 4);
// read() should be delegated to 'uncached'.
assertThat(cached.read(executor, alloc).join()).isSameAs(res);
verify(uncached, times(1)).readAttributes(executor);
verify(uncached, times(1)).read(executor, alloc);
verifyNoMoreInteractions(uncached);
clearInvocations(uncached);
// aggregate() should be delegated to 'uncached'.
assertThat(cached.aggregate(executor).join()).isSameAs(aggregated);
verify(uncached, times(1)).readAttributes(executor);
verify(uncached, times(1)).aggregate(executor);
verifyNoMoreInteractions(uncached);
clearInvocations(uncached);
// aggregateWithPooledObjects() should be delegated to 'uncached'.
assertThat(cached.aggregateWithPooledObjects(executor, alloc).join())
.isSameAs(aggregatedWithPooledObjs);
verify(uncached, times(1)).readAttributes(executor);
verify(uncached, times(1)).aggregateWithPooledObjects(executor, alloc);
verifyNoMoreInteractions(uncached);
clearInvocations(uncached);
}
@Test
void custom() {
final WebClient client = WebClient.of(server.httpUri());
// Make unhealthy.
final AggregatedHttpResponse res1 = client.execute(RequestHeaders.of(HttpMethod.PUT, "/hc_custom"),
"KO").aggregate().join();
assertThat(res1).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.SERVICE_UNAVAILABLE,
HttpHeaderNames.CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8,
"armeria-lphc", "60, 5"),
HttpData.ofUtf8("not ok")));
// Make healthy.
final AggregatedHttpResponse res2 = client.execute(RequestHeaders.of(HttpMethod.PUT, "/hc_custom"),
"OK").aggregate().join();
assertThat(res2).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.OK,
HttpHeaderNames.CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8,
"armeria-lphc", "60, 5"),
HttpData.ofUtf8("ok")));
// Send a no-op request.
final AggregatedHttpResponse res3 = client.execute(RequestHeaders.of(HttpMethod.PUT, "/hc_custom"),
"NOOP").aggregate().join();
assertThat(res3).isEqualTo(AggregatedHttpResponse.of(
ResponseHeaders.of(HttpStatus.OK,
HttpHeaderNames.CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8,
"armeria-lphc", "60, 5"),
HttpData.ofUtf8("ok")));
}
/**
* Returns a new {@link HttpResponseWriter} which has a content converted from the collected objects.
*
* @param stream a sequence of objects
* @param headers to be written to the returned {@link HttpResponseWriter}
* @param trailers to be written to the returned {@link HttpResponseWriter}
* @param contentConverter converts the collected objects into a content of the response
* @param executor executes the collecting job
*/
public static HttpResponseWriter aggregateFrom(Stream<?> stream,
ResponseHeaders headers, HttpHeaders trailers,
Function<Object, HttpData> contentConverter,
Executor executor) {
requireNonNull(stream, "stream");
requireNonNull(headers, "headers");
requireNonNull(trailers, "trailers");
requireNonNull(contentConverter, "contentConverter");
requireNonNull(executor, "executor");
return aggregateFrom(collectFrom(stream, executor), headers, trailers, contentConverter);
}
@Override
protected void configure(final ServerBuilder sb) throws Exception {
sb.service("/slow", (ctx, req) -> {
final HttpResponseWriter response = HttpResponse.streaming();
response.write(ResponseHeaders.of(HttpStatus.OK));
response.write(HttpData.ofUtf8("slow response"));
return response;
});
}
private <T, R extends ApiResponse<T>> PendingResult<T> handleMethod(
HttpMethod method,
String hostName,
String url,
HttpData payload,
String userAgent,
String experienceIdHeaderValue,
Class<R> clazz,
FieldNamingPolicy fieldNamingPolicy,
long errorTimeout,
Integer maxRetries,
ExceptionsAllowedToRetry exceptionsAllowedToRetry) {
var client =
httpClients.computeIfAbsent(
hostName,
host -> WebClient.builder(host).factory(clientFactory).options(clientOptions).build());
var gson = gsonForPolicy(fieldNamingPolicy);
var headers = RequestHeaders.builder(method, url);
if (experienceIdHeaderValue != null) {
headers.add("X-Goog-Maps-Experience-ID", experienceIdHeaderValue);
}
var request = HttpRequest.of(headers.build(), payload);
return new ArmeriaPendingResult<>(client, request, clazz, gson);
}