下面列出了怎么用io.reactivex.netty.protocol.http.client.HttpClient的API类实例代码及写法,或者点击链接到github查看源代码。
@Bean
HttpNewsService externalNews() {
return () -> HttpClient.newClient(new InetSocketAddress(NEWS_SERVER_PORT))
.createGet("")
.flatMap(HttpClientResponse::getContent)
.flatMapIterable(bb -> {
try {
return new ObjectMapper().readValue(
bb.toString(Charset.defaultCharset()),
new TypeReference<ArrayList<News>>() {}
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
listeners.add(createBearerHeaderAdder());
final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>,
HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
new HttpObjectAggregationConfigurator(maxChunkSize));
final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
.withClientConfig(config)
.withExecutorListeners(listeners)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
.build();
return client;
}
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
listeners.add(createBearerHeaderAdder());
final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>,
HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
new HttpObjectAggregationConfigurator(maxChunkSize));
final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
.withClientConfig(config)
.withExecutorListeners(listeners)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
.build();
return client;
}
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
String host = "localhost";
int port = DEFAULT_APPLICATION_PORT;
String path = "/healthcheck";
try {
URL url = new URL(externalHealthCheckURL);
host = url.getHost();
port = url.getPort();
path = url.getPath();
} catch (MalformedURLException e) {
//continue
}
Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.build();
return httpClient.submit(HttpClientRequest.createGet(path));
}
public static String getResponse(HttpClientRequest<ByteBuf> request, HttpClient<ByteBuf, ByteBuf> client) {
return client.submit(request).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(HttpClientResponse<ByteBuf> response) {
return response.getContent().map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
return byteBuf.toString(Charset.defaultCharset());
}
});
}
}).onErrorFlatMap(new Func1<OnErrorThrowable, Observable<String>>() {
@Override
public Observable<String> call(OnErrorThrowable onErrorThrowable) {
throw onErrorThrowable;
}
}).toBlocking().first();
}
@Test
public void testTransportFactoryWithInjection() {
Injector injector = Guice.createInjector(
new AbstractModule() {
@Override
protected void configure() {
bind(ClientConfigFactory.class).to(MyClientConfigFactory.class).in(Scopes.SINGLETON);
bind(RibbonTransportFactory.class).to(DefaultRibbonTransportFactory.class).in(Scopes.SINGLETON);
}
}
);
RibbonTransportFactory transportFactory = injector.getInstance(RibbonTransportFactory.class);
HttpClient<ByteBuf, ByteBuf> client = transportFactory.newHttpClient("myClient");
IClientConfig config = ((LoadBalancingHttpClient) client).getClientConfig();
assertEquals("MyConfig", config.getNameSpace());
}
public HttpResourceObservableCommand(HttpClient<ByteBuf, ByteBuf> httpClient,
HttpClientRequest<ByteBuf> httpRequest, String hystrixCacheKey,
Map<String, Object> requestProperties,
FallbackHandler<T> fallbackHandler,
ResponseValidator<HttpClientResponse<ByteBuf>> validator,
Class<? extends T> classType,
HystrixObservableCommand.Setter setter) {
super(setter);
this.httpClient = httpClient;
this.fallbackHandler = fallbackHandler;
this.validator = validator;
this.httpRequest = httpRequest;
this.hystrixCacheKey = hystrixCacheKey;
this.classType = classType;
this.requestProperties = requestProperties;
}
private Observable<String> initializeStream() {
HttpClient<ByteBuf, ServerSentEvent> client =
RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());
return client.submit(HttpClientRequest.createGet("/hello")).
flatMap(response -> {
printResponseHeader(response);
return response.getContent();
}).map(serverSentEvent -> serverSentEvent.contentAsString());
}
@Test
public void testGET() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createGet(PATH_1 + "?" + QUERY_1);
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.singleOrDefault(null).toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertNull(result1);
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
// Check stored traces (including 1 for the test client)
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_1, testProducer.getUri());
assertEquals(QUERY_1, testProducer.getProperties(Constants.PROP_HTTP_QUERY).iterator().next().getValue());
assertEquals("GET", testProducer.getOperation());
assertEquals("GET", testProducer.getProperties("http_method").iterator().next().getValue());
}
@Test
public void testPOST() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createPost(PATH_2);
req1.writeStringContent(Observable.just(HELLO_THERE));
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.singleOrDefault(null).toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertNull(result1);
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
// Check stored traces (including 1 for the test client)
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_2, testProducer.getUri());
assertTrue(testProducer.getProperties(Constants.PROP_HTTP_QUERY).isEmpty());
assertEquals("POST", testProducer.getOperation());
assertEquals("POST", testProducer.getProperties("http_method").iterator().next().getValue());
}
@Test
public void testPUT() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createPut(PATH_3);
req1.writeStringContent(Observable.just(HELLO_THERE));
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.singleOrDefault(null).toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertNull(result1);
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
// Check stored traces (including 1 for the test client)
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_3, testProducer.getUri());
assertTrue(testProducer.getProperties(Constants.PROP_HTTP_QUERY).isEmpty());
assertEquals("PUT", testProducer.getOperation());
assertEquals("PUT", testProducer.getProperties("http_method").iterator().next().getValue());
}
@Test
public void testGET() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createGet(PATH_1 + "?" + QUERY_1);
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.single().toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertEquals(HELLO_WORLD, result1);
// Check stored traces (including 1 for the test client)
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_1, testProducer.getUri());
assertEquals(QUERY_1, testProducer.getProperties(Constants.PROP_HTTP_QUERY).iterator().next().getValue());
assertEquals("GET", testProducer.getOperation());
assertEquals("GET", testProducer.getProperties("http_method").iterator().next().getValue());
}
@Test
public void testPOST() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createPost(PATH_2);
req1.writeStringContent(Observable.just(HELLO_THERE));
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.single().toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertEquals(HELLO_WORLD, result1);
// Check stored traces (including 1 for the test client)
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_2, testProducer.getUri());
assertTrue(testProducer.getProperties(Constants.PROP_HTTP_QUERY).isEmpty());
assertEquals("POST", testProducer.getOperation());
assertEquals("POST", testProducer.getProperties("http_method").iterator().next().getValue());
}
@Test
public void testPUT() throws InterruptedException, ExecutionException, TimeoutException {
SocketAddress serverAddress = new InetSocketAddress("127.0.0.1", server.getServerPort());
/*Create a new client for the server address*/
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(serverAddress);
HttpClientRequest<ByteBuf, ByteBuf> req1 = client.createPut(PATH_3);
req1.writeStringContent(Observable.just(HELLO_THERE));
Object result1 = req1
.flatMap((HttpClientResponse<ByteBuf> resp) -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.single().toBlocking().toFuture().get(5, TimeUnit.SECONDS);
assertEquals(HELLO_WORLD, result1);
// Check stored traces (including 1 for the test client)
Wait.until(() -> getApmMockServer().getTraces().size() == 1);
assertEquals(1, getApmMockServer().getTraces().size());
List<Producer> producers = NodeUtil.findNodes(getApmMockServer().getTraces().get(0).getNodes(), Producer.class);
assertEquals("Expecting 1 producers", 1, producers.size());
Producer testProducer = producers.get(0);
assertEquals(PATH_3, testProducer.getUri());
assertTrue(testProducer.getProperties(Constants.PROP_HTTP_QUERY).isEmpty());
assertEquals("PUT", testProducer.getOperation());
assertEquals("PUT", testProducer.getProperties("http_method").iterator().next().getValue());
}
SinkSubscriber(
@NotNull final HttpClient<ByteBuf, ByteBuf> httpClient,
@NotNull final Func1<Send, Observable<HttpClientRequest<ByteBuf>>> createPost
) {
this.httpClient = httpClient;
this.createPost = createPost;
}
@Test
public void testConnectionTerminatedOnClose() throws Exception {
final TcpSocketProxy proxy = new TcpSocketProxy(
new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", server.getServerPort())
);
proxy.start();
final int listenPort = proxy.getListenPort();
final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort);
final String first = client.submit(HttpClientRequest.createGet("/"))
.flatMap(AbstractHttpContentHolder::getContent)
.map(bb -> bb.toString(StandardCharsets.UTF_8))
.toBlocking()
.first();
assertThat(first).isEqualTo("Hello World");
LOGGER.info("first request done");
proxy.shutdown();
if (proxy.isShutdown()) {
proxy.close();
} else {
fail("proxy should have been shutdown");
}
try {
final URI uri = URI.create(String.format("http://localhost:%d/", listenPort));
uri.toURL().getContent();
fail("Shouldn't have been able to get content");
} catch (IOException e) {
// expected
}
}
public LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> forVip(String targetVip) {
Observable<MembershipEvent<Host>> eurekaHostSource = membershipSource.forInterest(Interests.forVips(targetVip), instanceInfo -> {
String ipAddress = instanceInfo.getDataCenterInfo()
.getAddresses().stream()
.filter(na -> na.getProtocolType() == ProtocolType.IPv4)
.collect(Collectors.toList()).get(0).getIpAddress();
HashSet<ServicePort> servicePorts = instanceInfo.getPorts();
ServicePort portToUse = servicePorts.iterator().next();
return new Host(ipAddress, portToUse.getPort());
});
final Map<Host, HttpClientHolder<ByteBuf, ServerSentEvent>> hostVsHolders = new ConcurrentHashMap<>();
String lbName = targetVip + "-lb";
return LoadBalancers.newBuilder(eurekaHostSource.map(
hostEvent -> {
HttpClient<ByteBuf, ServerSentEvent> client = clientPool.getClientForHost(hostEvent.getClient());
HttpClientHolder<ByteBuf, ServerSentEvent> holder;
if (hostEvent.getType() == MembershipEvent.EventType.REMOVE) {
holder = hostVsHolders.remove(hostEvent.getClient());
} else {
holder = new HttpClientHolder<>(client);
hostVsHolders.put(hostEvent.getClient(), holder);
}
return new MembershipEvent<>(hostEvent.getType(), holder);
})).withWeightingStrategy(new LinearWeightingStrategy<>(new RxNettyPendingRequests<>()))
.withName(lbName)
.withFailureDetector(new RxNettyFailureDetector<>()).build();
}
@Override
protected HttpClient<I, ServerSentEvent> getOrCreateRxClient(Server server) {
HttpClientBuilder<I, ServerSentEvent> clientBuilder =
new HttpClientBuilder<I, ServerSentEvent>(server.getHost(), server.getPort()).pipelineConfigurator(pipelineConfigurator);
int requestConnectTimeout = getProperty(IClientConfigKey.Keys.ConnectTimeout, null, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT);
RxClient.ClientConfig rxClientConfig = new HttpClientConfig.Builder().build();
HttpClient<I, ServerSentEvent> client = clientBuilder.channelOption(
ChannelOption.CONNECT_TIMEOUT_MILLIS, requestConnectTimeout).config(rxClientConfig).build();
return client;
}
/**
* Convert an HttpClientRequest to a ServerOperation
*
* @param server
* @param request
* @param rxClientConfig
* @return
*/
protected ServerOperation<HttpClientResponse<O>> requestToOperation(final HttpClientRequest<I> request, final ClientConfig rxClientConfig) {
Preconditions.checkNotNull(request);
return new ServerOperation<HttpClientResponse<O>>() {
final AtomicInteger count = new AtomicInteger(0);
@Override
public Observable<HttpClientResponse<O>> call(Server server) {
HttpClient<I,O> rxClient = getOrCreateRxClient(server);
setHostHeader(request, server.getHost());
Observable<HttpClientResponse<O>> o;
if (rxClientConfig != null) {
o = rxClient.submit(request, rxClientConfig);
}
else {
o = rxClient.submit(request);
}
return o.concatMap(new Func1<HttpClientResponse<O>, Observable<HttpClientResponse<O>>>() {
@Override
public Observable<HttpClientResponse<O>> call(HttpClientResponse<O> t1) {
if (t1.getStatus().code()/100 == 4 || t1.getStatus().code()/100 == 5)
return responseToErrorPolicy.call(t1, backoffStrategy.call(count.getAndIncrement()));
else
return Observable.just(t1);
}
});
}
};
}
public WSClient(String host, int port, int firstStep, int stepSize, int stepDuration, String query) {
this.host = host;
this.port = port;
this.firstStep = firstStep;
this.stepSize = stepSize;
this.stepDuration = stepDuration;
this.query = query;
System.out.println("Starting client with hostname: " + host + " port: " + port + " first-step: " + firstStep + " step-size: " + stepSize + " step-duration: " + stepDuration + "s query: " + query);
httpClient = new HttpClientBuilder<ByteBuf, ByteBuf>(this.host, this.port)
.withMaxConnections(15000)
.config(new HttpClient.HttpClientConfig.Builder().readTimeout(1, TimeUnit.MINUTES).build())
.build();
stats = new ConnectionPoolMetricListener();
httpClient.subscribe(stats);
client = httpClient.submit(HttpClientRequest.createGet(this.query))
.flatMap(response -> {
if (response.getStatus().code() == 200) {
counter.increment(CounterEvent.SUCCESS);
} else {
counter.increment(CounterEvent.HTTP_ERROR);
}
return response.getContent().doOnNext(bb -> {
counter.add(CounterEvent.BYTES, bb.readableBytes());
});
}).doOnError((t) -> {
if (t instanceof PoolExhaustedException) {
counter.increment(CounterEvent.POOL_EXHAUSTED);
} else {
counter.increment(CounterEvent.NETTY_ERROR);
}
});
}
public TestRouteBasic(String backendHost, int backendPort) {
host = backendHost;
port = backendPort;
client = new HttpClientBuilder<ByteBuf, ByteBuf>(host, port)
.withMaxConnections(10000)
.config(new HttpClient.HttpClientConfig.Builder().readTimeout(1, TimeUnit.MINUTES).build())
.build();
stats = new ConnectionPoolMetricListener();
client.subscribe(stats);
}
public static Observable<String> createRequestFromLB(Observable<MembershipEvent<Host>> eurekaHostSource) {
/**
* Create a LoadBalancer instance from eureka's host stream.
*/
return LoadBalancers.fromHostSource(
/**
* Map over the host event and create a RxNetty HTTP client.
* This enable Ocelli to directly manage the client instance and hence reduce a map lookup to get a
* client instance for a host for every request processing.
* If you prefer Ocelli to manage the host instances, that is possible too by omitting this map function
* and transforming the retrieved host via {@link LoadBalancer#choose()} to a client of
* your choice.
*/
eurekaHostSource.map(
hostEvent -> {
Host host = hostEvent.getClient();
/**
* Typically, you will use a pool of clients, so that for the same host, you do not end up creating a
* new client instance.
* This example creates a new client to reduce verbosity.
*/
HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient(host.getHostName(),
host.getPort());
/**
* Since, Ocelli expects a {@link netflix.ocelli.MembershipEvent} instance, wrap the client
* into that event.
*/
return new MembershipEvent<>(hostEvent.getType(), client);
}))
.choose() /* Chooses the best possible host for the next request*/
/**
* Submit the request to the returned client.
*/
.flatMap(client ->
client.submit(HttpClientRequest.createGet("/"))
/* Print the HTTP initial line and headers. Return the content.*/
.flatMap((Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>) response -> {
/**
* Printing the HTTP initial line.
*/
System.out.println( response.getHttpVersion().text() + ' '
+ response.getStatus().code() + ' '
+ response .getStatus().reasonPhrase());
/**
* Printing HTTP headers.
*/
for (Map.Entry<String, String> header : response.getHeaders().entries()) {
System.out.println(header.getKey() + ": " + header.getValue());
}
// Line break after the headers.
System.out.println();
return response.getContent();
})
/* Convert the ByteBuf for each content chunk into a string. */
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset())));
}
protected final HttpClient<ByteBuf, ByteBuf> getClient() {
return this.client;
}
public final HttpClient<ByteBuf, ByteBuf> getClient() {
return client;
}
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
this.config = config;
return super.newHttpClient(config);
}
@Override
protected HttpClient<I, O> createRxClient(Server server) {
HttpClientBuilder<I, O> clientBuilder;
if (requestIdProvider != null) {
clientBuilder = RxContexts.<I, O>newHttpClientBuilder(server.getHost(), server.getPort(),
requestIdProvider, RxContexts.DEFAULT_CORRELATOR, pipelineConfigurator);
} else {
clientBuilder = RxContexts.<I, O>newHttpClientBuilder(server.getHost(), server.getPort(),
RxContexts.DEFAULT_CORRELATOR, pipelineConfigurator);
}
Integer connectTimeout = getProperty(IClientConfigKey.Keys.ConnectTimeout, null, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT);
Integer readTimeout = getProperty(IClientConfigKey.Keys.ReadTimeout, null, DefaultClientConfigImpl.DEFAULT_READ_TIMEOUT);
Boolean followRedirect = getProperty(IClientConfigKey.Keys.FollowRedirects, null, null);
HttpClientConfig.Builder builder = new HttpClientConfig.Builder().readTimeout(readTimeout, TimeUnit.MILLISECONDS);
if (followRedirect != null) {
builder.setFollowRedirect(followRedirect);
}
clientBuilder
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.config(builder.build());
if (isPoolEnabled()) {
clientBuilder
.withConnectionPoolLimitStrategy(poolStrategy)
.withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills)
.withPoolIdleCleanupScheduler(poolCleanerScheduler);
}
else {
clientBuilder
.withNoConnectionPooling();
}
if (sslContextFactory != null) {
try {
SSLEngineFactory myFactory = new DefaultFactories.SSLContextBasedFactory(sslContextFactory.getSSLContext()) {
@Override
public SSLEngine createSSLEngine(ByteBufAllocator allocator) {
SSLEngine myEngine = super.createSSLEngine(allocator);
myEngine.setUseClientMode(true);
return myEngine;
}
};
clientBuilder.withSslEngineFactory(myFactory);
} catch (ClientSslSocketFactoryException e) {
throw new RuntimeException(e);
}
}
return clientBuilder.build();
}
@VisibleForTesting
Map<Server, HttpClient<I, O>> getRxClients() {
return rxClientCache;
}
public HttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
return RibbonTransport.newHttpClient(config);
}
public final HttpClient<ByteBuf, ByteBuf> newHttpClient(String name) {
IClientConfig config = clientConfigFactory.newConfig();
config.loadProperties(name);
return newHttpClient(config);
}
public HttpClient<ByteBuf, ByteBuf> getClient() {
return client;
}