下面列出了怎么用io.reactivex.netty.protocol.http.client.HttpClientResponse的API类实例代码及写法,或者点击链接到github查看源代码。
@Bean
HttpNewsService externalNews() {
return () -> HttpClient.newClient(new InetSocketAddress(NEWS_SERVER_PORT))
.createGet("")
.flatMap(HttpClientResponse::getContent)
.flatMapIterable(bb -> {
try {
return new ObjectMapper().readValue(
bb.toString(Charset.defaultCharset()),
new TypeReference<ArrayList<News>>() {}
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Test
public void testPoolReuse() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
IClientConfig.Builder.newBuilder().withDefaultValues()
.withMaxAutoRetries(1)
.withMaxAutoRetriesNextServer(1).build());
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
response = observableClient.submit(request);
person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
final HttpClientListener listener = observableClient.getListener();
assertEquals(2, listener.getPoolAcquires());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 2;
}
});
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolReuse());
}
@Test(groups = { "unit" })
public void verifyConversionOfHttpResponseHeadersToMap() throws UnsupportedEncodingException {
HttpHeaders headersMap = new DefaultHttpHeaders();
headersMap.add(HttpConstants.HttpHeaders.OWNER_FULL_NAME, OWNER_FULL_NAME_VALUE);
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_0,
HttpResponseStatus.ACCEPTED,
headersMap);
HttpResponseHeaders httpResponseHeaders = new HttpClientResponse(httpResponse, null).getHeaders();
Set<Entry<String, String>> resultHeadersSet = HttpUtils.asMap(httpResponseHeaders).entrySet();
assertThat(resultHeadersSet.size()).isEqualTo(1);
Entry<String, String> entry = resultHeadersSet.iterator().next();
assertThat(entry.getKey()).isEqualTo(HttpConstants.HttpHeaders.OWNER_FULL_NAME);
assertThat(entry.getValue()).isEqualTo(HttpUtils.urlDecode(OWNER_FULL_NAME_VALUE));
List<Entry<String, String>> resultHeadersList = HttpUtils.unescape(httpResponseHeaders.entries());
assertThat(resultHeadersList.size()).isEqualTo(1);
entry = resultHeadersSet.iterator().next();
assertThat(entry.getKey()).isEqualTo(HttpConstants.HttpHeaders.OWNER_FULL_NAME);
assertThat(entry.getValue()).isEqualTo(HttpUtils.urlDecode(OWNER_FULL_NAME_VALUE));
}
@Test
public void testPostWithByteBuf() throws Exception {
Person myPerson = new Person("netty", 5);
ObjectMapper mapper = new ObjectMapper();
byte[] raw = mapper.writeValueAsBytes(myPerson);
ByteBuf buffer = Unpooled.copiedBuffer(raw);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(SERVICE_URI + "testAsync/person")
.withHeader("Content-type", "application/json")
.withHeader("Content-length", String.valueOf(raw.length))
.withContent(buffer);
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().set(CommonClientConfigKey.ReadTimeout, 10000));
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(myPerson, person);
}
private Observable<Void> updateRecommendation(String user, Movie movie) {
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createPost(format("/users/%s/recommendations", user))
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withRawContentSource(Observable.just(movie.getId()), new StringTransformer());
return client.submit(httpRequest).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> httpClientResponse) {
if (httpClientResponse.getStatus().code() / 100 != 2) {
return Observable.error(new RuntimeException(
format("HTTP request failed (status code=%s)", httpClientResponse.getStatus())));
}
return Observable.empty();
}
});
}
@Test
public void testSubmitToAbsoluteURI() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient();
// final List<Person> result = Lists.newArrayList();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
// need to sleep to wait until connection is released
final HttpClientListener listener = observableClient.getListener();
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolAcquires());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
public HttpClientResponse<ByteBuf> asHttpClientResponse() {
if (this.networkFailure != null) {
return null;
}
HttpClientResponse<ByteBuf> resp = Mockito.mock(HttpClientResponse.class);
Mockito.doReturn(HttpResponseStatus.valueOf(status)).when(resp).getStatus();
Mockito.doReturn(Observable.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, content))).when(resp).getContent();
DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(status), httpHeaders);
try {
Constructor<HttpResponseHeaders> constructor = HttpResponseHeaders.class.getDeclaredConstructor(HttpResponse.class);
constructor.setAccessible(true);
HttpResponseHeaders httpResponseHeaders = constructor.newInstance(httpResponse);
Mockito.doReturn(httpResponseHeaders).when(resp).getHeaders();
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
throw new IllegalStateException("Failed to instantiate class object.", e);
}
return resp;
}
private HttpClientMockWrapper(long responseAfterMillis, final HttpClientResponse<ByteBuf> httpClientResponse, final Exception e) {
httpClient = Mockito.mock(CompositeHttpClient.class);
assert httpClientResponse == null || e == null;
Mockito.doAnswer(new Answer() {
@Override
public Observable<HttpClientResponse<ByteBuf>> answer(InvocationOnMock invocationOnMock) throws Throwable {
RxClient.ServerInfo serverInfo = invocationOnMock.getArgumentAt(0, RxClient.ServerInfo.class);
HttpClientRequest<ByteBuf> req = invocationOnMock.getArgumentAt(1, HttpClientRequest.class);
requests.add(new ImmutablePair<>(req, serverInfo));
if (responseAfterMillis <= 0) {
return httpClientResponseOrException(httpClientResponse, e);
} else {
return Observable.timer(responseAfterMillis, TimeUnit.MILLISECONDS).flatMap(t -> httpClientResponseOrException(httpClientResponse, e));
}
}
}).when(httpClient).submit(Mockito.any(RxClient.ServerInfo.class), Mockito.any(HttpClientRequest.class));
}
/**
* Attempts to read the content of an error response as {@code text/plain;charset=utf-8}, otherwise the content
* will be ignored and a string detailing the Content-Type that was not processed.
* <p>
* <b>NOTE:</b>
* <i>
* This method MUST be called from the netty-io thread otherwise the content of the response will not be
* available because if will be released automatically as soon as the netty-io thread is left.
* </i>
* @param resp The response to attempt to read from
* @return An {@link Observable} representing the {@code text/plain;charset=utf-8} response content if it existed
* or an error message indicating the content-type that was not attempted to read.
*/
@NotNull
static Observable<String> attemptToReadErrorResponse(@NotNull final HttpClientResponse<ByteBuf> resp) {
final HttpResponseHeaders headers = resp.getHeaders();
final String contentType = resp.getHeaders().get(HttpHeaderNames.CONTENT_TYPE);
if (headers.isContentLengthSet() && headers.getContentLength() > 0 ) {
if (contentType != null && contentType.startsWith("text/plain")) {
return resp.getContent()
.map(r -> r.toString(StandardCharsets.UTF_8));
} else {
resp.ignoreContent();
final String errMsg = getErrMsg(contentType);
return Observable.just(errMsg);
}
} else {
return Observable.just("");
}
}
@Test
public void attemptToReadErrorResponse_responseContentIgnoredByDefaultWhenNotString() throws Exception {
final String errMsg = "lies";
final byte[] bytes = errMsg.getBytes(StandardCharsets.UTF_8);
final HttpClientResponse<ByteBuf> resp = response(Unpooled.copiedBuffer(bytes), (headers) -> {
headers.add("Content-Type", "application/json;charset=utf-8");
headers.add("Content-Length", bytes.length);
});
final String err = ResponseUtils.attemptToReadErrorResponse(resp).toBlocking().first();
assertThat(err).isNotEqualTo("lies");
try {
resp.getContent().toBlocking().first();
} catch (IllegalStateException e) {
assertThat(e.getMessage()).isEqualTo("Content stream is already disposed.");
}
}
private Observable<Void> registerMovie(Movie movie) {
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createPost("/movies")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withRawContentSource(Observable.just(movie), new RxMovieTransformer());
return client.submit(httpRequest).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> httpClientResponse) {
if (httpClientResponse.getStatus().code() / 100 != 2) {
return Observable.error(new RuntimeException(
format("HTTP request failed (status code=%s)", httpClientResponse.getStatus())));
}
return Observable.empty();
}
});
}
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
listeners.add(createBearerHeaderAdder());
final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>,
HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
new HttpObjectAggregationConfigurator(maxChunkSize));
final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
.withClientConfig(config)
.withExecutorListeners(listeners)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
.build();
return client;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings
public static void main(String[] args) throws Exception {
LoadBalancingHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("http://www.google.com/");
final CountDownLatch latch = new CountDownLatch(1);
client.submit(request)
.toBlocking()
.forEach(new Action1<HttpClientResponse<ByteBuf>>() {
@Override
public void call(HttpClientResponse<ByteBuf> t1) {
System.out.println("Status code: " + t1.getStatus());
t1.getContent().subscribe(new Action1<ByteBuf>() {
@Override
public void call(ByteBuf content) {
System.out.println("Response content: " + content.toString(Charset.defaultCharset()));
latch.countDown();
}
});
}
});
latch.await(2, TimeUnit.SECONDS);
}
public static String getResponse(HttpClientRequest<ByteBuf> request, HttpClient<ByteBuf, ByteBuf> client) {
return client.submit(request).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(HttpClientResponse<ByteBuf> response) {
return response.getContent().map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
return byteBuf.toString(Charset.defaultCharset());
}
});
}
}).onErrorFlatMap(new Func1<OnErrorThrowable, Observable<String>>() {
@Override
public Observable<String> call(OnErrorThrowable onErrorThrowable) {
throw onErrorThrowable;
}
}).toBlocking().first();
}
public HttpResourceObservableCommand(HttpClient<ByteBuf, ByteBuf> httpClient,
HttpClientRequest<ByteBuf> httpRequest, String hystrixCacheKey,
Map<String, Object> requestProperties,
FallbackHandler<T> fallbackHandler,
ResponseValidator<HttpClientResponse<ByteBuf>> validator,
Class<? extends T> classType,
HystrixObservableCommand.Setter setter) {
super(setter);
this.httpClient = httpClient;
this.fallbackHandler = fallbackHandler;
this.validator = validator;
this.httpRequest = httpRequest;
this.hystrixCacheKey = hystrixCacheKey;
this.classType = classType;
this.requestProperties = requestProperties;
}
@Test
public void testObservable() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
final HttpClientListener listener = observableClient.getListener();
assertEquals(1, listener.getPoolAcquires());
assertEquals(1, listener.getConnectionCount());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
/**
* Given the request it creates an observable which upon subscription issues HTTP call and emits one RxDocumentServiceResponse.
*
* @param request
* @param method
* @return Observable<RxDocumentServiceResponse>
*/
public Observable<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest request, HttpMethod method) {
try {
URI uri = getUri(request);
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.create(method, uri.toString());
this.fillHttpRequestBaseWithHeaders(request.getHeaders(), httpRequest);
if (request.getContentObservable() != null) {
// TODO validate this
// convert byte[] to ByteBuf
// why not use Observable<byte[]> directly?
Observable<ByteBuf> byteBufObservable = request.getContentObservable()
.map(bytes -> Unpooled.wrappedBuffer(bytes));
httpRequest.withContentSource(byteBufObservable);
} else if (request.getContent() != null){
httpRequest.withContent(request.getContent());
}
RxClient.ServerInfo serverInfo = new RxClient.ServerInfo(uri.getHost(), uri.getPort());
Observable<HttpClientResponse<ByteBuf>> clientResponseObservable = this.httpClient.submit(serverInfo, httpRequest);
return toDocumentServiceResponse(clientResponseObservable, request).observeOn(Schedulers.computation());
} catch (Exception e) {
return Observable.error(e);
}
}
@Test(groups = "simple")
public void mockInitializeReaderAsync() throws Exception {
HttpClientResponse<ByteBuf> mockedResponse = getMockResponse(databaseAccountJson);
Mockito.when(mockHttpClient.submit(Matchers.any(RxClient.ServerInfo.class), Matchers.any()))
.thenReturn(Observable.just(mockedResponse));
Single<DatabaseAccount> databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync();
validateSuccess(databaseAccount, expectedDatabaseAccount);
}
@Test(groups = "simple")
public void mockInitializeReaderAsyncWithResourceToken() throws Exception {
mockGatewayServiceConfigurationReader = new GatewayServiceConfigurationReader(new URI(TestConfigurations.HOST),
true, "SampleResourceToken", connectionPolicy, baseAuthorizationTokenProvider, mockHttpClient);
HttpClientResponse<ByteBuf> mockedResponse = getMockResponse(databaseAccountJson);
Mockito.when(mockHttpClient.submit(Matchers.any(RxClient.ServerInfo.class), Matchers.any()))
.thenReturn(Observable.just(mockedResponse));
Single<DatabaseAccount> databaseAccount = mockGatewayServiceConfigurationReader.initializeReaderAsync();
validateSuccess(databaseAccount, expectedDatabaseAccount);
}
private HttpClientResponse<ByteBuf> getMockResponse(String databaseAccountJson) {
HttpClientResponse<ByteBuf> resp = Mockito.mock(HttpClientResponse.class);
Mockito.doReturn(HttpResponseStatus.valueOf(200)).when(resp).getStatus();
ByteBuf byteBuffer = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, databaseAccountJson);
Mockito.doReturn(Observable.just(byteBuffer))
.when(resp).getContent();
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders = httpHeaders.add(HttpConstants.HttpHeaders.CONTENT_LENGTH, byteBuffer.writerIndex());
DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.valueOf(200), httpHeaders);
try {
Constructor<HttpResponseHeaders> constructor = HttpResponseHeaders.class
.getDeclaredConstructor(HttpResponse.class);
constructor.setAccessible(true);
HttpResponseHeaders httpResponseHeaders = constructor.newInstance(httpResponse);
Mockito.doReturn(httpResponseHeaders).when(resp).getHeaders();
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
| NoSuchMethodException | SecurityException e) {
throw new IllegalStateException("Failed to instantiate class object.", e);
}
return resp;
}
RxGatewayStoreModel createRxGatewayProxy(
ISessionContainer sessionContainer,
ConsistencyLevel consistencyLevel,
QueryCompatibilityMode queryCompatibilityMode,
UserAgentContainer userAgentContainer,
GlobalEndpointManager globalEndpointManager,
CompositeHttpClient<ByteBuf, ByteBuf> rxOrigClient) {
origHttpClient = rxOrigClient;
spyHttpClient = Mockito.spy(rxOrigClient);
doAnswer((Answer<Observable<HttpClientResponse<ByteBuf>>>) invocationOnMock -> {
RxClient.ServerInfo serverInfo =
invocationOnMock.getArgumentAt(0, RxClient.ServerInfo.class);
HttpClientRequest<ByteBuf> request
= invocationOnMock.getArgumentAt(1, HttpClientRequest.class);
httpRequests.add(request);
Observable<HttpClientResponse<ByteBuf>> httpRespObs =
origHttpClient.submit(serverInfo, request);
return httpRespObs;
}).when(spyHttpClient).submit( anyObject(),
(HttpClientRequest) anyObject());
return super.createRxGatewayProxy(sessionContainer,
consistencyLevel,
queryCompatibilityMode,
userAgentContainer,
globalEndpointManager,
spyHttpClient);
}
public static <I> LoadBalancingHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return SSEClient.<I>sseClientBuilder()
.withClientConfig(config)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.build();
}
private Single<DatabaseAccount> toDatabaseAccountObservable(
Single<HttpClientResponse<ByteBuf>> clientResponseObservable) {
return clientResponseObservable.flatMap(clientResponse -> {
return HttpClientUtils.parseResponseAsync(clientResponse)
.map(rxDocumentServiceResponse -> rxDocumentServiceResponse.getResource(DatabaseAccount.class));
});
}
@Test
public void testReadTimeout() throws Exception {
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ReadTimeout, "100"));
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/readTimeout");
Observable<HttpClientResponse<ByteBuf>> observable = observableClient.submit(request);
ObserverWithLatch<HttpClientResponse<ByteBuf>> observer = new ObserverWithLatch<HttpClientResponse<ByteBuf>>();
observable.subscribe(observer);
observer.await();
assertTrue(observer.error instanceof io.netty.handler.timeout.ReadTimeoutException);
}
public static Single<StoreResponse> toStoreResponse(HttpClientResponse<ByteBuf> clientResponse) {
HttpResponseHeaders httpResponseHeaders = clientResponse.getHeaders();
HttpResponseStatus httpResponseStatus = clientResponse.getStatus();
Observable<String> contentObservable;
if (clientResponse.getContent() == null) {
// for delete we don't expect any body
contentObservable = Observable.just(null);
} else {
// transforms the observable<ByteBuf> to Observable<InputStream>
contentObservable = toString(clientResponse.getContent(), clientResponse.getHeaders().getIntHeader(HttpConstants.HttpHeaders.CONTENT_LENGTH, -1));
}
Observable<StoreResponse> storeResponseObservable = contentObservable
.flatMap(content -> {
try {
// transforms to Observable<StoreResponse>
StoreResponse rsp = new StoreResponse(httpResponseStatus.code(), HttpUtils.unescape(httpResponseHeaders.entries()), content);
return Observable.just(rsp);
} catch (Exception e) {
return Observable.error(e);
}
});
return storeResponseObservable.toSingle();
}
protected static Single<String> getErrorResponseAsync(HttpClientResponse<ByteBuf> responseMessage) {
if (responseMessage.getContent() == null) {
return Single.just(StringUtils.EMPTY);
}
return getErrorFromStream(responseMessage.getContent(), responseMessage.getHeaders().getIntHeader(HttpConstants.HttpHeaders.CONTENT_LENGTH, -1));
}
@Test
public void testUpateRecommendations() {
movieServer.movies.put(ORANGE_IS_THE_NEW_BLACK.getId(), ORANGE_IS_THE_NEW_BLACK);
HttpResponseStatus statusCode = RxNetty.createHttpPost(baseURL + "/users/" + TEST_USER_ID + "/recommendations", Observable.just(ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer())
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<HttpResponseStatus>>() {
@Override
public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) {
return Observable.just(httpClientResponse.getStatus());
}
}).toBlocking().first();
assertEquals(HttpResponseStatus.OK, statusCode);
assertTrue(movieServer.userRecommendations.get(TEST_USER_ID).contains(ORANGE_IS_THE_NEW_BLACK.getId()));
}
private static void printResponseHeader(HttpClientResponse<ServerSentEvent> response) {
System.out.println("New response received.");
System.out.println("========================");
System.out.println(response.getHttpVersion().text() + ' ' + response.getStatus().code()
+ ' ' + response.getStatus().reasonPhrase());
for (Map.Entry<String, String> header : response.getHeaders().entries()) {
System.out.println(header.getKey() + ": " + header.getValue());
}
}
@Test
public void testPostWithObservable() throws Exception {
Person myPerson = new Person("netty", 5);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(SERVICE_URI + "testAsync/person")
.withHeader("Content-type", "application/json")
.withContent(SerializationUtils.serializeToBytes(JacksonCodec.getInstance(), myPerson, null));
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().set(CommonClientConfigKey.ReadTimeout, 10000));
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(new Server(host, port), request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(myPerson, person);
}
@Test
public void testPOST() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createPost(PATH_2);
req1.writeStringContent(Observable.just(HELLO_THERE));
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.singleOrDefault(null).toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertNull(result1);
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
// Check stored traces (including 1 for the test client)
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_2, testProducer.getUri());
assertTrue(testProducer.getProperties(Constants.PROP_HTTP_QUERY).isEmpty());
assertEquals("POST", testProducer.getOperation());
assertEquals("POST", testProducer.getProperties("http_method").iterator().next().getValue());
}