下面列出了怎么用org.apache.http.impl.nio.client.HttpAsyncClients的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Default constructor
*/
public SaltService() {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(0)
.setSocketTimeout(0)
.setConnectionRequestTimeout(0)
.setCookieSpec(CookieSpecs.STANDARD)
.build();
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom();
httpClientBuilder.setDefaultRequestConfig(requestConfig);
asyncHttpClient = httpClientBuilder
.setMaxConnPerRoute(20)
.setMaxConnTotal(20)
.build();
asyncHttpClient.start();
SALT_CLIENT = new SaltClient(SALT_MASTER_URI, new HttpAsyncClientImpl(asyncHttpClient));
saltSSHService = new SaltSSHService(SALT_CLIENT, SaltActionChainGeneratorService.INSTANCE);
defaultBatch = Batch.custom().withBatchAsAmount(ConfigDefaults.get().getSaltBatchSize())
.withDelay(ConfigDefaults.get().getSaltBatchDelay())
.withPresencePingTimeout(ConfigDefaults.get().getSaltPresencePingTimeout())
.withPresencePingGatherJobTimeout(ConfigDefaults.get().getSaltPresencePingGatherJobTimeout())
.build();
}
public static CloseableHttpAsyncClient getInstance(ServerConfig serverConfig)
throws IOException{
if (HTTPC_CLIENT == null) {
if (serverConfig.isTrustAllCertificates()) {
try {
SSLContext sslContext = SSLContexts.custom()
.loadTrustMaterial(null, TrustAllStrategy.INSTANCE).build();
HTTPC_CLIENT = HttpAsyncClients.custom()
.setSSLHostnameVerifier((NoopHostnameVerifier.INSTANCE))
.setSSLContext(sslContext)
.build();
} catch (KeyManagementException | NoSuchAlgorithmException
| KeyStoreException e) {
throw new IOException(e);
}
} else {
HTTPC_CLIENT = HttpAsyncClients.createDefault();
}
HTTPC_CLIENT.start();
}
return HTTPC_CLIENT;
}
private CloseableHttpAsyncClient createHttpAsyncClient(YunpianConf conf) throws IOReactorException {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(conf.getConfInt(YunpianConf.HTTP_CONN_TIMEOUT, "10000"))
.setSoTimeout(conf.getConfInt(YunpianConf.HTTP_SO_TIMEOUT, "30000")).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
ConnectionConfig connectionConfig = ConnectionConfig.custom().setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Charset.forName(conf.getConf(YunpianConf.HTTP_CHARSET, YunpianConf.HTTP_CHARSET_DEFAULT))).build();
connManager.setDefaultConnectionConfig(connectionConfig);
connManager.setMaxTotal(conf.getConfInt(YunpianConf.HTTP_CONN_MAXTOTAL, "100"));
connManager.setDefaultMaxPerRoute(conf.getConfInt(YunpianConf.HTTP_CONN_MAXPERROUTE, "10"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(connManager).build();
httpclient.start();
return httpclient;
}
public static void main(String[] args) throws Exception {
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
try {
httpclient.start();
HttpHost proxy = new HttpHost("someproxy", 8080);
RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
HttpGet request = new HttpGet("https://issues.apache.org/");
request.setConfig(config);
Future<HttpResponse> future = httpclient.execute(request, null);
HttpResponse response = future.get();
System.out.println("Response: " + response.getStatusLine());
System.out.println("Shutting down");
} finally {
httpclient.close();
}
}
public static void main(final String[] args) throws Exception {
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
try {
httpclient.start();
Future<Boolean> future = httpclient.execute(HttpAsyncMethods.createGet("http://localhost:8080/"),
new MyResponseConsumer(), null);
Boolean result = future.get();
if (result != null && result.booleanValue()) {
System.out.println("Request successfully executed");
} else {
System.out.println("Request failed");
}
System.out.println("Shutting down");
} finally {
httpclient.close();
}
System.out.println("Done");
}
public static void main(final String[] args) throws Exception {
CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
try {
httpclient.start();
HttpHost targetHost = new HttpHost("localhost", 8080);
HttpGet[] resquests = { new HttpGet("/docs/index.html"), new HttpGet("/docs/introduction.html"),
new HttpGet("/docs/setup.html"), new HttpGet("/docs/config/index.html") };
Future<List<HttpResponse>> future = httpclient.execute(targetHost, Arrays.<HttpRequest>asList(resquests),
null);
List<HttpResponse> responses = future.get();
System.out.println(responses);
System.out.println("Shutting down");
} finally {
httpclient.close();
}
System.out.println("Done");
}
public static void main(String[] args) throws Exception {
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope("localhost", 443),
new UsernamePasswordCredentials("username", "password"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setDefaultCredentialsProvider(credsProvider)
.build();
try {
HttpGet httpget = new HttpGet("http://localhost/");
System.out.println("Executing request " + httpget.getRequestLine());
Future<HttpResponse> future = httpclient.execute(httpget, null);
HttpResponse response = future.get();
System.out.println("Response: " + response.getStatusLine());
System.out.println("Shutting down");
} finally {
httpclient.close();
}
}
public static void main(final String[] args) throws Exception {
CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
try {
httpclient.start();
HttpHost targetHost = new HttpHost("localhost", 8080);
HttpGet[] resquests = { new HttpGet("/docs/index.html"), new HttpGet("/docs/introduction.html"),
new HttpGet("/docs/setup.html"), new HttpGet("/docs/config/index.html") };
List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
for (HttpGet request : resquests) {
requestProducers.add(new MyRequestProducer(targetHost, request));
responseConsumers.add(new MyResponseConsumer(request));
}
Future<List<Boolean>> future = httpclient.execute(targetHost, requestProducers, responseConsumers, null);
future.get();
System.out.println("Shutting down");
} finally {
httpclient.close();
}
System.out.println("Done");
}
public static void main(String[] args) throws Exception {
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope("someproxy", 8080),
new UsernamePasswordCredentials("username", "password"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setDefaultCredentialsProvider(credsProvider)
.build();
try {
httpclient.start();
HttpHost proxy = new HttpHost("someproxy", 8080);
RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
HttpGet httpget = new HttpGet("https://issues.apache.org/");
httpget.setConfig(config);
Future<HttpResponse> future = httpclient.execute(httpget, null);
HttpResponse response = future.get();
System.out.println("Response: " + response.getStatusLine());
System.out.println("Shutting down");
} finally {
httpclient.close();
}
}
@Override
public CloseableHttpAsyncClient generateClient ()
{
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope (AuthScope.ANY),
new UsernamePasswordCredentials(serviceUser, servicePass));
RequestConfig rqconf = RequestConfig.custom()
.setCookieSpec(CookieSpecs.DEFAULT)
.setSocketTimeout(Timeouts.SOCKET_TIMEOUT)
.setConnectTimeout(Timeouts.CONNECTION_TIMEOUT)
.setConnectionRequestTimeout(Timeouts.CONNECTION_REQUEST_TIMEOUT)
.build();
CloseableHttpAsyncClient res = HttpAsyncClients.custom ()
.setDefaultCredentialsProvider (credsProvider)
.setDefaultRequestConfig(rqconf)
.build ();
res.start ();
return res;
}
@Override
public CloseableHttpAsyncClient generateClient ()
{
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope (AuthScope.ANY),
new UsernamePasswordCredentials(username, password));
RequestConfig rqconf = RequestConfig.custom()
.setCookieSpec(CookieSpecs.DEFAULT)
.setSocketTimeout(Timeouts.SOCKET_TIMEOUT)
.setConnectTimeout(Timeouts.CONNECTION_TIMEOUT)
.setConnectionRequestTimeout(Timeouts.CONNECTION_REQUEST_TIMEOUT)
.build();
CloseableHttpAsyncClient res = HttpAsyncClients.custom ()
.setDefaultCredentialsProvider (credsProvider)
.setDefaultRequestConfig(rqconf)
.build ();
res.start ();
return res;
}
private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) {
LOG.info("Creating async proxy http client");
PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager();
HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort());
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
if (cm != null) {
clientBuilder = clientBuilder.setConnectionManager(cm);
}
if (proxy != null) {
clientBuilder = clientBuilder.setProxy(proxy);
}
clientBuilder = setRedirects(clientBuilder);
return clientBuilder.build();
}
@Override
public void transactionMarker() throws Exception {
CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
httpClient.start();
HttpHost httpHost = new HttpHost("localhost", getPort());
HttpGet httpGet = new HttpGet("/hello2");
SimpleFutureCallback callback = new SimpleFutureCallback();
Future<HttpResponse> future = httpClient.execute(httpHost, httpGet, callback);
callback.latch.await();
httpClient.close();
int responseStatusCode = future.get().getStatusLine().getStatusCode();
if (responseStatusCode != 200) {
throw new IllegalStateException(
"Unexpected response status code: " + responseStatusCode);
}
}
@Override
public void transactionMarker() throws Exception {
CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
httpClient.start();
HttpHost httpHost = new HttpHost("localhost", getPort());
HttpPost httpPost = new HttpPost("/hello4");
SimpleFutureCallback callback = new SimpleFutureCallback();
Future<HttpResponse> future = httpClient.execute(httpHost, httpPost, callback);
callback.latch.await();
httpClient.close();
int responseStatusCode = future.get().getStatusLine().getStatusCode();
if (responseStatusCode != 200) {
throw new IllegalStateException(
"Unexpected response status code: " + responseStatusCode);
}
}
@Test
public void whenUseCookiesWithHttpAsyncClient_thenCorrect() throws Exception {
final BasicCookieStore cookieStore = new BasicCookieStore();
final BasicClientCookie cookie = new BasicClientCookie(COOKIE_NAME, "1234");
cookie.setDomain(COOKIE_DOMAIN);
cookie.setPath("/");
cookieStore.addCookie(cookie);
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().build();
client.start();
final HttpGet request = new HttpGet(HOST_WITH_COOKIE);
final HttpContext localContext = new BasicHttpContext();
localContext.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
final Future<HttpResponse> future = client.execute(request, localContext, null);
final HttpResponse response = future.get();
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client.close();
}
public TestRpcClient(String serviceUrl, Map<String, String> procedureRestPathMap) {
_serviceUrl = serviceUrl;
_procedureRestPathMap = procedureRestPathMap;
_syncClient = HttpClients.createDefault();
_asyncClient = HttpAsyncClients.createDefault();
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public SentinelApiClient() {
IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
httpClient.start();
}
public AsyncHttpClient(){
httpclient= HttpAsyncClients
.custom()
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(600)
.setSocketTimeout(700)
.setConnectionRequestTimeout(500).build()).build();
httpclient.start();
}
public void initialize(final NettyRpcProperties properties) throws Exception {
final CommonProperties cp = properties.getCommonProperties();
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CommonProperties.CPUS * 2)
.setConnectTimeout(Integer.parseInt(cp.getHttpConnectTimeout()))
.setSoTimeout(Integer.parseInt(cp.getHttpSocketTimeout()))
.setSndBufSize(Integer.parseInt(cp.getHttpSendBufSize()))
.setRcvBufSize(Integer.parseInt(cp.getHttpRcvBufSize()))
.setBacklogSize(Integer.parseInt(cp.getHttpBackLogSize()))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(cp.getHttpMaxTotal()));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
private void setupAsyncClient() {
final HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
if (sslContext != null) {
clientBuilder.setSSLContext(sslContext);
clientBuilder.addInterceptorFirst(new HttpsResponseInterceptor());
}
httpAsyncClient = clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider()).build();
httpAsyncClient.start();
}
@Override
protected CloseableHttpAsyncClient build(Payload payload) {
HttpAsyncClientBuilder asyncClientBuilder = HttpAsyncClients.custom();
if (StringUtils.isNotBlank(payload.getUserAgent())) {
asyncClientBuilder.setUserAgent(payload.getUserAgent());
} else {
asyncClientBuilder.setUserAgent("");
}
asyncClientBuilder.setRedirectStrategy(new CustomRedirectStrategy());
asyncClientBuilder.setConnectionManagerShared(true);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(payload.getConnectTimeout())
.setSocketTimeout(payload.getSocketTimeout()).build();
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Consts.UTF_8).build();
poolingNHttpClientConnectionManager.setDefaultConnectionConfig(connectionConfig);
asyncClientBuilder.setConnectionManager(poolingNHttpClientConnectionManager);
asyncClientBuilder.setDefaultRequestConfig(requestConfig);
if (payload.getProxy() != null) {
Proxy proxy = payload.getProxy();
HttpHost httpHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getScheme());
asyncClientBuilder.setProxy(httpHost);
}
reduceCookie(asyncClientBuilder,payload);
return asyncClientBuilder.build();
}
/***
* 创建client
*
* @param config
* 查询对象
* @param cm
* 连接池管理
* @param openTSDBConfig
* @return
*/
private static CloseableHttpAsyncClient createPoolingHttpClient(RequestConfig config, PoolingNHttpClientConnectionManager cm,
OpenTSDBConfig openTSDBConfig) {
cm.setMaxTotal(100);
cm.setDefaultMaxPerRoute(100);
HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom().setConnectionManager(cm)
.setDefaultRequestConfig(config);
// 如果不是只读,则设置为长连接
if (!openTSDBConfig.isReadonly()) {
httpAsyncClientBuilder.setKeepAliveStrategy(myStrategy());
}
CloseableHttpAsyncClient client = httpAsyncClientBuilder.build();
return client;
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public SentinelApiClient() {
IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
httpClient.start();
}
public void initialize() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CPUS * 2)
.setConnectTimeout(Integer.parseInt(HTTPCLIENT_CONNCT_TIMEOUT_DEFAULT))
.setSoTimeout(Integer.parseInt(HTTPCLIENT_SOCKET_TIMEOUT_DEFAULT))
.setSndBufSize(Integer.parseInt(HTTPCLIENT_SEDBUFSIZE_DEFAULT))
.setRcvBufSize(Integer.parseInt(HTTPCLIENT_RCV_BUFSIZE_DEFAULT))
.setBacklogSize(Integer.parseInt(HTTPCLIENT_BACK_LOG_SIZE_DEFAULT))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(HTTPCLIENT_MAX_TOTAL_DEFAULT));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
isStarted=true;
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
@Test
public void testHttpAsyncClient() throws InterruptedException, IOException {
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
final CountDownLatch latch1 = new CountDownLatch(1);
final HttpGet request2 = new HttpGet("http://www.apache.org/");
httpclient.execute(request2, new FutureCallback<HttpResponse>() {
public void completed(final HttpResponse response2) {
latch1.countDown();
System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
}
public void failed(final Exception ex) {
latch1.countDown();
System.out.println(request2.getRequestLine() + "->" + ex);
}
public void cancelled() {
latch1.countDown();
System.out.println(request2.getRequestLine() + " cancelled");
}
});
latch1.await();
}
private CloseableHttpAsyncClient createHttpAsyncClient(Config config, PoolingNHttpClientConnectionManager cm){
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom();
httpClientBuilder.setConnectionManager(cm);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.getConnectTimeoutMillis())
.setConnectionRequestTimeout(config.getConnectTimeoutMillis())
.setSocketTimeout(config.getReadTimeoutMillis())
.build();
httpClientBuilder.setDefaultRequestConfig(requestConfig);
httpClientBuilder.setUserAgent(config.getUserAgent());
httpClientBuilder.disableCookieManagement();
return httpClientBuilder.build();
}
public static CloseableHttpAsyncClient getCloseableHttpAsyncClient() throws IOReactorException {
if (closeableHttpAsyncClient == null) {
synchronized (WxMpTemplateMsgSender.class) {
if (closeableHttpAsyncClient == null) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(-1)
.setSocketTimeout(-1)
.setConnectionRequestTimeout(-1)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true).setConnectTimeout(-1).setSoTimeout(-1)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
//最大连接数
connManager.setMaxTotal(5000);
//per route最大连接数
connManager.setDefaultMaxPerRoute(5000);
closeableHttpAsyncClient = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
closeableHttpAsyncClient.start();
}
}
}
return closeableHttpAsyncClient;
}
protected CloseableHttpAsyncClient createAsyncHttpClient(NHttpClientConnectionManager connectionManager) {
return HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(getRequestConfig())
.setProxyAuthenticationStrategy(wrappedHttpClientConfig.getHttpClientConfig().getProxyAuthenticationStrategy())
.setRoutePlanner(getRoutePlanner())
.setDefaultCredentialsProvider(wrappedHttpClientConfig.getHttpClientConfig().getCredentialsProvider())
.build();
}