下面列出了怎么用io.reactivex.netty.protocol.http.client.HttpClientRequest的API类实例代码及写法,或者点击链接到github查看源代码。
private Observable<Void> registerMovie(Movie movie) {
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createPost("/movies")
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withRawContentSource(Observable.just(movie), new RxMovieTransformer());
return client.submit(httpRequest).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> httpClientResponse) {
if (httpClientResponse.getStatus().code() / 100 != 2) {
return Observable.error(new RuntimeException(
format("HTTP request failed (status code=%s)", httpClientResponse.getStatus())));
}
return Observable.empty();
}
});
}
void initRequestCapture(CompositeHttpClient<ByteBuf, ByteBuf> spyClient) {
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
RxClient.ServerInfo serverInfo = invocationOnMock.getArgumentAt(0, RxClient.ServerInfo.class);
HttpClientRequest<ByteBuf> httpReq = invocationOnMock.getArgumentAt(1, HttpClientRequest.class);
CompletableFuture<HttpResponseHeaders> f = new CompletableFuture<>();
requestsResponsePairs.add(Pair.of(httpReq, f));
return origHttpClient.submit(serverInfo, httpReq)
.doOnNext(
res -> f.complete(res.getHeaders())
).doOnError(
e -> f.completeExceptionally(e)
);
}
}).when(spyClient).submit(Mockito.any(RxClient.ServerInfo.class), Mockito.any(HttpClientRequest.class));
}
private Single<StoreResponse> processHttpResponse(String resourceAddress, HttpClientRequest<ByteBuf> request, String activityId, HttpClientResponse<ByteBuf> response, URI physicalAddress) {
if (response == null) {
InternalServerErrorException exception =
new InternalServerErrorException(
String.format(
RMResources.ExceptionMessage,
RMResources.InvalidBackendResponse),
null,
physicalAddress);
exception.getResponseHeaders().put(HttpConstants.HttpHeaders.ACTIVITY_ID,
activityId);
exception.getResponseHeaders().put(HttpConstants.HttpHeaders.REQUEST_VALIDATION_FAILURE, "1");
return Single.error(exception);
}
// If the status code is < 300 or 304 NotModified (we treat not modified as success) then it means that it's a success code and shouldn't throw.
if (response.getStatus().code() < HttpConstants.StatusCodes.MINIMUM_STATUSCODE_AS_ERROR_GATEWAY ||
response.getStatus().code() == HttpConstants.StatusCodes.NOT_MODIFIED) {
return HttpTransportClient.createStoreResponseFromHttpResponse(response);
}
else {
return this.createErrorResponseFromHttpResponse(resourceAddress, activityId, request, response);
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings
public static void main(String[] args) throws Exception {
LoadBalancingHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("http://www.google.com/");
final CountDownLatch latch = new CountDownLatch(1);
client.submit(request)
.toBlocking()
.forEach(new Action1<HttpClientResponse<ByteBuf>>() {
@Override
public void call(HttpClientResponse<ByteBuf> t1) {
System.out.println("Status code: " + t1.getStatus());
t1.getContent().subscribe(new Action1<ByteBuf>() {
@Override
public void call(ByteBuf content) {
System.out.println("Response content: " + content.toString(Charset.defaultCharset()));
latch.countDown();
}
});
}
});
latch.await(2, TimeUnit.SECONDS);
}
@Test
public void testObservable() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
final HttpClientListener listener = observableClient.getListener();
assertEquals(1, listener.getPoolAcquires());
assertEquals(1, listener.getConnectionCount());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
@Test
public void testUserAgentContains_MesosRxJavaCore_RxNetty() throws Exception {
final String clientName = "unit-tests";
final MesosClient<String, String> client = MesosClientBuilder.<String, String>newBuilder()
.sendCodec(StringMessageCodec.UTF8_STRING)
.receiveCodec(StringMessageCodec.UTF8_STRING)
.mesosUri(URI.create("http://localhost:12345"))
.applicationUserAgentEntry(literal(clientName, "latest"))
.subscribe("subscribe")
.processStream(events -> events.map(e -> Optional.<SinkOperation<String>>empty()))
.build();
final HttpClientRequest<ByteBuf> request = client.createPost
.call("ACK")
.toBlocking()
.first();
final Map<String, String> headers = headersToMap(request.getHeaders());
assertThat(headers).containsKeys("User-Agent");
final String ua = headers.get("User-Agent");
assertThat(ua).startsWith(String.format("%s/%s", clientName, "latest"));
assertThat(ua).contains("mesos-rxjava-client/");
}
@Test
public void testBasicAuthHeaderAddedToRequestWhenUserInfoPresentInUri() throws Exception {
final Func1<String, Observable<HttpClientRequest<ByteBuf>>> createPost = MesosClient.curryCreatePost(
URI.create("http://testuser:[email protected]:12345/api/v1/scheduler"),
StringMessageCodec.UTF8_STRING,
StringMessageCodec.UTF8_STRING,
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>(null)
);
final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
.first();
final String authHeaderName = HttpHeaderNames.AUTHORIZATION.toString();
final Map<String, String> headers = headersToMap(request.getHeaders());
assertThat(headers).containsKeys(authHeaderName);
final String authorization = headers.get(authHeaderName);
assertThat(authorization).isEqualTo("Basic dGVzdHVzZXI6dGVzdHBhc3N3b3Jk");
final String base64UserPass = authorization.substring("Basic ".length());
final String userPass = new String(Base64.getDecoder().decode(base64UserPass));
assertThat(userPass).isEqualTo("testuser:testpassword");
}
@Test
public void testMesosStreamIdAddedToRequestWhenNonNull() throws Exception {
final Func1<String, Observable<HttpClientRequest<ByteBuf>>> createPost = MesosClient.curryCreatePost(
URI.create("http://localhost:12345/api/v1/scheduler"),
StringMessageCodec.UTF8_STRING,
StringMessageCodec.UTF8_STRING,
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>("streamId")
);
final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
.first();
final Map<String, String> headers = headersToMap(request.getHeaders());
assertThat(headers).containsKeys("Mesos-Stream-Id");
assertThat(headers.get("Mesos-Stream-Id")).isEqualTo("streamId");
}
@Test
public void testMesosStreamIdNotPresentWhenNull() throws Exception {
final Func1<String, Observable<HttpClientRequest<ByteBuf>>> createPost = MesosClient.curryCreatePost(
URI.create("http://localhost:12345/api/v1/scheduler"),
StringMessageCodec.UTF8_STRING,
StringMessageCodec.UTF8_STRING,
new UserAgent(
literal("testing", "latest")
),
new AtomicReference<>(null)
);
final HttpClientRequest<ByteBuf> request = createPost.call("something")
.toBlocking()
.first();
final Map<String, String> headers = headersToMap(request.getHeaders());
assertThat(headers).doesNotContainKeys("Mesos-Stream-Id");
}
@Test
public void testPoolReuse() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
IClientConfig.Builder.newBuilder().withDefaultValues()
.withMaxAutoRetries(1)
.withMaxAutoRetriesNextServer(1).build());
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
response = observableClient.submit(request);
person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
final HttpClientListener listener = observableClient.getListener();
assertEquals(2, listener.getPoolAcquires());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 2;
}
});
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolReuse());
}
@Test
public void testStreamWithLoadBalancer() throws Exception {
// NettyHttpLoadBalancerErrorHandler errorHandler = new NettyHttpLoadBalancerErrorHandler(1, 3, true);
// IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
IClientConfig config = IClientConfig.Builder.newBuilder().withRetryOnAllOperations(true)
.withMaxAutoRetries(1)
.withMaxAutoRetriesNextServer(3)
.build();
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
LoadBalancingHttpClient<ByteBuf, ServerSentEvent> lbObservables = (LoadBalancingHttpClient<ByteBuf, ServerSentEvent>) RibbonTransport.newSSEClient(lb, config);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/personStream");
List<Person> result = Lists.newArrayList();
Server goodServer = new Server("localhost:" + port);
Server badServer = new Server("localhost:12245");
List<Server> servers = Lists.newArrayList(badServer, badServer, badServer, goodServer);
lb.setServersList(servers);
result = getPersonListFromResponse(lbObservables.submit(request, null, null));
assertEquals(EmbeddedResources.entityStream, result);
}
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
listeners.add(createBearerHeaderAdder());
final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>,
HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
new HttpObjectAggregationConfigurator(maxChunkSize));
final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
.withClientConfig(config)
.withExecutorListeners(listeners)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
.build();
return client;
}
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
String host = "localhost";
int port = DEFAULT_APPLICATION_PORT;
String path = "/healthcheck";
try {
URL url = new URL(externalHealthCheckURL);
host = url.getHost();
port = url.getPort();
path = url.getPath();
} catch (MalformedURLException e) {
//continue
}
Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.build();
return httpClient.submit(HttpClientRequest.createGet(path));
}
public static String getResponse(HttpClientRequest<ByteBuf> request, HttpClient<ByteBuf, ByteBuf> client) {
return client.submit(request).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(HttpClientResponse<ByteBuf> response) {
return response.getContent().map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
return byteBuf.toString(Charset.defaultCharset());
}
});
}
}).onErrorFlatMap(new Func1<OnErrorThrowable, Observable<String>>() {
@Override
public Observable<String> call(OnErrorThrowable onErrorThrowable) {
throw onErrorThrowable;
}
}).toBlocking().first();
}
public HttpResourceObservableCommand(HttpClient<ByteBuf, ByteBuf> httpClient,
HttpClientRequest<ByteBuf> httpRequest, String hystrixCacheKey,
Map<String, Object> requestProperties,
FallbackHandler<T> fallbackHandler,
ResponseValidator<HttpClientResponse<ByteBuf>> validator,
Class<? extends T> classType,
HystrixObservableCommand.Setter setter) {
super(setter);
this.httpClient = httpClient;
this.fallbackHandler = fallbackHandler;
this.validator = validator;
this.httpRequest = httpRequest;
this.hystrixCacheKey = hystrixCacheKey;
this.classType = classType;
this.requestProperties = requestProperties;
}
@Test
public void testPostWithByteBuf() throws Exception {
Person myPerson = new Person("netty", 5);
ObjectMapper mapper = new ObjectMapper();
byte[] raw = mapper.writeValueAsBytes(myPerson);
ByteBuf buffer = Unpooled.copiedBuffer(raw);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(SERVICE_URI + "testAsync/person")
.withHeader("Content-type", "application/json")
.withHeader("Content-length", String.valueOf(raw.length))
.withContent(buffer);
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().set(CommonClientConfigKey.ReadTimeout, 10000));
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(myPerson, person);
}
protected LoadBalancingHttpClient(Builder<I, O> builder) {
super(builder.lb, builder.config, new RequestSpecificRetryHandler(true, true, builder.retryHandler, null), builder.pipelineConfigurator, builder.poolCleanerScheduler);
requestIdHeaderName = getProperty(IClientConfigKey.Keys.RequestIdHeaderName, null, null);
requestIdProvider = (requestIdHeaderName != null)
? new HttpRequestIdProvider(requestIdHeaderName, RxContexts.DEFAULT_CORRELATOR)
: null;
this.listeners = new CopyOnWriteArrayList<ExecutionListener<HttpClientRequest<I>, HttpClientResponse<O>>>(builder.listeners);
defaultCommandBuilder = LoadBalancerCommand.<HttpClientResponse<O>>builder()
.withLoadBalancerContext(lbContext)
.withListeners(this.listeners)
.withClientConfig(builder.config)
.withRetryHandler(builder.retryHandler)
.build();
this.responseToErrorPolicy = builder.responseToErrorPolicy;
this.backoffStrategy = builder.backoffStrategy;
}
public WSClient(String host, int port, int firstStep, int stepSize, int stepDuration, String query) {
this.host = host;
this.port = port;
this.firstStep = firstStep;
this.stepSize = stepSize;
this.stepDuration = stepDuration;
this.query = query;
System.out.println("Starting client with hostname: " + host + " port: " + port + " first-step: " + firstStep + " step-size: " + stepSize + " step-duration: " + stepDuration + "s query: " + query);
httpClient = new HttpClientBuilder<ByteBuf, ByteBuf>(this.host, this.port)
.withMaxConnections(15000)
.config(new HttpClient.HttpClientConfig.Builder().readTimeout(1, TimeUnit.MINUTES).build())
.build();
stats = new ConnectionPoolMetricListener();
httpClient.subscribe(stats);
client = httpClient.submit(HttpClientRequest.createGet(this.query))
.flatMap(response -> {
if (response.getStatus().code() == 200) {
counter.increment(CounterEvent.SUCCESS);
} else {
counter.increment(CounterEvent.HTTP_ERROR);
}
return response.getContent().doOnNext(bb -> {
counter.add(CounterEvent.BYTES, bb.readableBytes());
});
}).doOnError((t) -> {
if (t instanceof PoolExhaustedException) {
counter.increment(CounterEvent.POOL_EXHAUSTED);
} else {
counter.increment(CounterEvent.NETTY_ERROR);
}
});
}
@Test(groups = { "simple" }, timeOut = TIMEOUT, dataProvider = "sessionTestArgProvider")
public void sessionTokenInDocumentRead(boolean isNameBased) throws UnsupportedEncodingException {
Document document = new Document();
document.setId(UUID.randomUUID().toString());
document.set("pk", "pk");
document = spyClient.createDocument(getCollectionLink(isNameBased), document, null, false).toBlocking().single()
.getResource();
final String documentLink = getDocumentLink(document, isNameBased);
spyClient.readDocument(documentLink, null).toBlocking().single()
.getResource();
List<HttpClientRequest<ByteBuf>> documentReadHttpRequests = spyClient.getCapturedRequests().stream()
.filter(r -> r.getMethod() == HttpMethod.GET)
.filter(r -> {
try {
return URLDecoder.decode(r.getUri().replaceAll("\\+", "%2b"), "UTF-8").contains(
StringUtils.removeEnd(documentLink, "/"));
} catch (UnsupportedEncodingException e) {
return false;
}
}).collect(Collectors.toList());
// Direct mode may make more than one call (multiple replicas)
assertThat(documentReadHttpRequests.size() >= 1).isTrue();
assertThat(documentReadHttpRequests.get(0).getHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN)).isNotEmpty();
}
@Override
public void onStartWithServer(ExecutionContext<HttpClientRequest<I>> context, ExecutionInfo info) {
checkContext(context);
if (lastServer == null) {
lastServer = info.getServer();
} else if (!lastServer.equals(info.getServer())) {
lastServer = info.getServer();
numAttemptsOnServer.set(0);
numServers.incrementAndGet();
}
checkExecutionInfo(info);
startWithServerCounter.incrementAndGet();
}
@Test
@Ignore
public void testRedirect() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/redirect?port=" + port);
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient =
RibbonTransport.newHttpClient(
IClientConfig.Builder.newBuilder().withDefaultValues()
.withFollowRedirects(true)
.build());
Person person = getPersonObservable(observableClient.submit(new Server(host, port), request)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
}
@Test
public void testObservableWithRetrySameServer() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/person");
Server badServer = new Server("localhost:12345");
Server goodServer = new Server("localhost:" + port);
List<Server> servers = Lists.newArrayList(badServer, badServer, goodServer);
BaseLoadBalancer lb = LoadBalancerBuilder.<Server>newBuilder()
.withRule(new AvailabilityFilteringRule())
.withPing(new DummyPing())
.buildFixedServerListLoadBalancer(servers);
LoadBalancingHttpClient<ByteBuf, ByteBuf> lbObservables = RibbonTransport.newHttpClient(lb, config,
new NettyHttpLoadBalancerErrorHandler(1, 0, true));
Observable<Person> observableWithRetries = getPersonObservable(lbObservables.submit(request));
ObserverWithLatch<Person> observer = new ObserverWithLatch<Person>();
observableWithRetries.subscribe(observer);
observer.await();
assertNull(observer.obj);
assertTrue(observer.error instanceof ClientException);
ServerStats stats = lbObservables.getServerStats(badServer);
// two requests to bad server because retry same server is set to 1
assertEquals(2, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
stats = lbObservables.getServerStats(goodServer);
assertEquals(0, stats.getTotalRequestsCount());
}
@Test
public void testReadTimeout() throws Exception {
LoadBalancingHttpClient<ByteBuf, ByteBuf> observableClient = RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ReadTimeout, "100"));
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/readTimeout");
Observable<HttpClientResponse<ByteBuf>> observable = observableClient.submit(request);
ObserverWithLatch<HttpClientResponse<ByteBuf>> observer = new ObserverWithLatch<HttpClientResponse<ByteBuf>>();
observable.subscribe(observer);
observer.await();
assertTrue(observer.error instanceof io.netty.handler.timeout.ReadTimeoutException);
}
private Single<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.create(HttpMethod.GET,
this.serviceEndpoint.toString());
httpRequest.withHeader(HttpConstants.HttpHeaders.VERSION, HttpConstants.Versions.CURRENT_VERSION);
UserAgentContainer userAgentContainer = new UserAgentContainer();
String userAgentSuffix = this.connectionPolicy.getUserAgentSuffix();
if (userAgentSuffix != null && userAgentSuffix.length() > 0) {
userAgentContainer.setSuffix(userAgentSuffix);
}
httpRequest.withHeader(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent());
httpRequest.withHeader(HttpConstants.HttpHeaders.API_TYPE, Constants.Properties.SQL_API_TYPE);
String authorizationToken = StringUtils.EMPTY;
if (this.hasAuthKeyResourceToken || baseAuthorizationTokenProvider == null) {
authorizationToken = HttpUtils.urlEncode(this.authKeyResourceToken);
} else {
// Retrieve the document service properties.
String xDate = Utils.nowAsRFC1123();
httpRequest.withHeader(HttpConstants.HttpHeaders.X_DATE, xDate);
Map<String, String> header = new HashMap<>();
header.put(HttpConstants.HttpHeaders.X_DATE, xDate);
authorizationToken = baseAuthorizationTokenProvider
.generateKeyAuthorizationSignature(HttpConstants.HttpMethods.GET, serviceEndpoint, header);
}
httpRequest.withHeader(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);
RxClient.ServerInfo serverInfo = new RxClient.ServerInfo(serviceEndpoint.getHost(), serviceEndpoint.getPort());
Observable<HttpClientResponse<ByteBuf>> clientResponseObservable = this.httpClient.submit(serverInfo,
httpRequest);
return toDatabaseAccountObservable(clientResponseObservable.toSingle());
}
@Test(groups = "unit")
public void validateDefaultHeaders() {
HttpClientResponse<ByteBuf> mockedResponse = new HttpClientMockWrapper.HttpClientBehaviourBuilder()
.withContent("").withStatus(200)
.withHeaders(EmptyHttpHeaders.INSTANCE)
.asHttpClientResponse();
HttpClientMockWrapper httpClientMockWrapper = new HttpClientMockWrapper(mockedResponse);
UserAgentContainer userAgentContainer = new UserAgentContainer();
userAgentContainer.setSuffix("i am suffix");
HttpTransportClient transportClient = getHttpTransportClientUnderTest(100,
userAgentContainer,
httpClientMockWrapper.getClient());
RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document);
request.setContentBytes(new byte[0]);
transportClient.invokeStoreAsync(Uri.create(physicalAddress),
new ResourceOperation(OperationType.Create, ResourceType.Document),
request).toBlocking().value();
assertThat(httpClientMockWrapper.getCapturedInvocation()).asList().hasSize(1);
ImmutablePair<HttpClientRequest<ByteBuf>, RxClient.ServerInfo> httpClientInvocation = httpClientMockWrapper.getCapturedInvocation().get(0);
assertThat(httpClientInvocation.left.getHeaders().get(HttpConstants.HttpHeaders.USER_AGENT)).endsWith("i am suffix");
assertThat(httpClientInvocation.left.getHeaders().get(HttpConstants.HttpHeaders.CACHE_CONTROL)).isEqualTo("no-cache");
assertThat(httpClientInvocation.left.getHeaders().get(HttpConstants.HttpHeaders.ACCEPT)).isEqualTo("application/json");
assertThat(httpClientInvocation.left.getHeaders().get(HttpConstants.HttpHeaders.VERSION)).isEqualTo(HttpConstants.Versions.CURRENT_VERSION);
}
private void checkContext(ExecutionContext<HttpClientRequest<I>> context) {
try {
assertSame(requestConfig, context.getRequestConfig());
assertSame(expectedRequest, context.getRequest());
assertEquals(MY_OBJECT, context.get("MyObject"));
if (this.context == null) {
this.context = context;
} else {
assertSame(this.context, context);
}
} catch (Throwable e) {
e.printStackTrace();
checkContext = false;
}
}
@Override
public void onStartWithServer(ExecutionContext<HttpClientRequest<ByteBuf>> context, ExecutionInfo info) throws AbortExecutionException {
KeycloakSecurityContext securityContext = KeycloakSecurityContextAssociation.get();
if (securityContext != null) {
HttpClientRequest<ByteBuf> request = context.getRequest();
request.withHeader("Authorization", "Bearer " + securityContext.getTokenString());
context.put(KeycloakSecurityContextAssociation.class.getName(), securityContext);
} else {
KeycloakSecurityContextAssociation.disassociate();
}
}
@Override
public void onExceptionWithServer(ExecutionContext<HttpClientRequest<ByteBuf>> context, Throwable exception, ExecutionInfo info) {
KeycloakSecurityContext securityContext = (KeycloakSecurityContext) context.get(KeycloakSecurityContextAssociation.class.getName());
if (securityContext != null) {
KeycloakSecurityContextAssociation.associate(securityContext);
} else {
KeycloakSecurityContextAssociation.disassociate();
}
}
@Override
public void onExecutionSuccess(ExecutionContext<HttpClientRequest<ByteBuf>> context, HttpClientResponse<ByteBuf> response, ExecutionInfo info) {
KeycloakSecurityContext securityContext = (KeycloakSecurityContext) context.get(KeycloakSecurityContextAssociation.class.getName());
if (securityContext != null) {
KeycloakSecurityContextAssociation.associate(securityContext);
} else {
KeycloakSecurityContextAssociation.disassociate();
}
}
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return LoadBalancingHttpClient.<I, O>builder()
.withClientConfig(config)
.withRetryHandler(retryHandler)
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(poolCleanerScheduler)
.build();
}