下面列出了怎么用org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager的API类实例代码及写法,或者点击链接到github查看源代码。
public HttpAsyncClientFactory() {
try {
int ioThreadCount = Runtime.getRuntime().availableProcessors() * 8;
IOReactorConfig ioReactorConfig =
IOReactorConfig.custom().setIoThreadCount(ioThreadCount)
.setConnectTimeout(httpConfig.getConnectTimeout()).setSoTimeout(httpConfig.getReadTimeout()).build();
ConnectingIOReactor ioReactor =
new DefaultConnectingIOReactor(ioReactorConfig, new NamedThreadFactory("flower-http"));
this.connectionManager = new PoolingNHttpClientConnectionManager(ioReactor);
this.connectionManager.setMaxTotal(1024);
this.connectionManager.setDefaultMaxPerRoute(256);
initHttpClient();
} catch (Exception e) {
logger.error("", e);
}
}
/***
* 创建定时任务线程池
*
* @param cm
* 连接池管理
* @return
*/
private static ScheduledExecutorService initFixedCycleCloseConnection(final PoolingNHttpClientConnectionManager cm) {
// 通过工厂方法创建线程
ScheduledExecutorService connectionGcService = Executors.newSingleThreadScheduledExecutor((r) -> {
Thread t = new Thread(r, "Fixed-Cycle-Close-Connection-" + NUM.incrementAndGet());
t.setDaemon(true);
return t;
});
// 定时关闭所有空闲链接
connectionGcService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.debug("Close idle connections, fixed cycle operation");
// 关闭30秒内不活动的链接
cm.closeExpiredConnections();
cm.closeIdleConnections(30, TimeUnit.SECONDS);
} catch (Exception ex) {
log.error("", ex);
}
}
}, 30, 30, TimeUnit.SECONDS);
return connectionGcService;
}
@Override
protected NHttpClientConnectionManager getAsyncConnectionManager() {
PoolingNHttpClientConnectionManager connectionManager = createUnconfiguredPoolingNHttpClientConnectionManager();
HttpClientConfig httpClientConfig = this.wrappedHttpClientConfig.getHttpClientConfig();
final Integer maxTotal = httpClientConfig.getMaxTotalConnection();
if (maxTotal != null) {
connectionManager.setMaxTotal(maxTotal);
}
final Integer defaultMaxPerRoute = httpClientConfig.getDefaultMaxTotalConnectionPerRoute();
if (defaultMaxPerRoute != null) {
connectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
}
final Map<HttpRoute, Integer> maxPerRoute = httpClientConfig.getMaxTotalConnectionPerRoute();
for (Map.Entry<HttpRoute, Integer> entry : maxPerRoute.entrySet()) {
connectionManager.setMaxPerRoute(entry.getKey(), entry.getValue());
}
return connectionManager;
}
@Test
public void getAsyncConnectionManagerConfiguresMaxTotalIfConfigured() {
// given
HttpClientConfig.Builder config = createDefaultTestHttpClientConfigBuilder();
int expectedMaxTotalConnection = random.nextInt(100) + 10;
config.maxTotalConnection(expectedMaxTotalConnection);
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder(config.build());
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
PoolingNHttpClientConnectionManager mockedNHttpConnectionManager = mock(PoolingNHttpClientConnectionManager.class);
when(factory.createUnconfiguredPoolingNHttpClientConnectionManager())
.thenReturn(mockedNHttpConnectionManager);
// when
factory.getAsyncConnectionManager();
// then
verify(mockedNHttpConnectionManager).setMaxTotal(eq(expectedMaxTotalConnection));
}
@Test
public void getAsyncConnectionManagerConfiguresDefaultMaxTotalPerRouteIfConfigured() {
// given
HttpClientConfig.Builder config = createDefaultTestHttpClientConfigBuilder();
int expectedMaxTotalConnection = random.nextInt(100) + 10;
config.defaultMaxTotalConnectionPerRoute(expectedMaxTotalConnection);
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder(config.build());
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
PoolingNHttpClientConnectionManager mockedNHttpConnectionManager = mock(PoolingNHttpClientConnectionManager.class);
when(factory.createUnconfiguredPoolingNHttpClientConnectionManager())
.thenReturn(mockedNHttpConnectionManager);
// when
factory.getAsyncConnectionManager();
// then
verify(mockedNHttpConnectionManager).setDefaultMaxPerRoute(eq(expectedMaxTotalConnection));
}
@Test
public void getAsyncConnectionManagerConfiguresMaxTotalPerRouteIfConfigured() {
// given
HttpClientConfig.Builder config = createDefaultTestHttpClientConfigBuilder();
HttpRoute expectedHttpRoute = new HttpRoute(new HttpHost("localhost"));
int expectedMaxTotalConnection = random.nextInt(100) + 10;
config.maxTotalConnectionPerRoute(expectedHttpRoute, expectedMaxTotalConnection);
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder(config.build());
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
PoolingNHttpClientConnectionManager mockedNHttpConnectionManager = mock(PoolingNHttpClientConnectionManager.class);
when(factory.createUnconfiguredPoolingNHttpClientConnectionManager())
.thenReturn(mockedNHttpConnectionManager);
// when
factory.getAsyncConnectionManager();
// then
verify(mockedNHttpConnectionManager).setMaxPerRoute(eq(expectedHttpRoute), eq(expectedMaxTotalConnection));
}
private CloseableHttpAsyncClient createHttpAsyncClient(YunpianConf conf) throws IOReactorException {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(conf.getConfInt(YunpianConf.HTTP_CONN_TIMEOUT, "10000"))
.setSoTimeout(conf.getConfInt(YunpianConf.HTTP_SO_TIMEOUT, "30000")).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
ConnectionConfig connectionConfig = ConnectionConfig.custom().setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Charset.forName(conf.getConf(YunpianConf.HTTP_CHARSET, YunpianConf.HTTP_CHARSET_DEFAULT))).build();
connManager.setDefaultConnectionConfig(connectionConfig);
connManager.setMaxTotal(conf.getConfInt(YunpianConf.HTTP_CONN_MAXTOTAL, "100"));
connManager.setDefaultMaxPerRoute(conf.getConfInt(YunpianConf.HTTP_CONN_MAXPERROUTE, "10"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(connManager).build();
httpclient.start();
return httpclient;
}
public FiberApacheHttpClientRequestExecutor(final Validator<CloseableHttpResponse> resValidator, final int maxConnections, final int timeout, final int parallelism) throws IOReactorException {
final DefaultConnectingIOReactor ioreactor = new DefaultConnectingIOReactor(IOReactorConfig.custom().
setConnectTimeout(timeout).
setIoThreadCount(parallelism).
setSoTimeout(timeout).
build());
final PoolingNHttpClientConnectionManager mngr = new PoolingNHttpClientConnectionManager(ioreactor);
mngr.setDefaultMaxPerRoute(maxConnections);
mngr.setMaxTotal(maxConnections);
final CloseableHttpAsyncClient ahc = HttpAsyncClientBuilder.create().
setConnectionManager(mngr).
setDefaultRequestConfig(RequestConfig.custom().setLocalAddress(null).build()).build();
client = new FiberHttpClient(ahc);
validator = resValidator;
}
private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) {
LOG.info("Creating async proxy http client");
PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager();
HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort());
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
if (cm != null) {
clientBuilder = clientBuilder.setConnectionManager(cm);
}
if (proxy != null) {
clientBuilder = clientBuilder.setProxy(proxy);
}
clientBuilder = setRedirects(clientBuilder);
return clientBuilder.build();
}
private NHttpClientConnectionManager getNHttpConnManager(Config config) throws IOException {
NHttpClientConnectionManager httpConnManager;
String connMgrStr = config.getString(HTTP_CONN_MANAGER);
switch (ApacheHttpClient.ConnManager.valueOf(connMgrStr.toUpperCase())) {
case POOLING:
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
PoolingNHttpClientConnectionManager poolingConnMgr = new PoolingNHttpClientConnectionManager(ioReactor);
poolingConnMgr.setMaxTotal(config.getInt(POOLING_CONN_MANAGER_MAX_TOTAL_CONN));
poolingConnMgr.setDefaultMaxPerRoute(config.getInt(POOLING_CONN_MANAGER_MAX_PER_CONN));
httpConnManager = poolingConnMgr;
break;
default:
throw new IllegalArgumentException(connMgrStr + " is not supported");
}
LOG.info("Using " + httpConnManager.getClass().getSimpleName());
return httpConnManager;
}
private CloseableHttpAsyncClient asyncHttpClient() throws Exception {
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
ioReactor(), asyncRegistry());
Map<String, Object> asyncHttpClientMap = (Map<String, Object>)configMap.get(ASYNC_REST_TEMPLATE);
connectionManager.setMaxTotal((Integer)asyncHttpClientMap.get(MAX_CONNECTION_TOTAL));
connectionManager.setDefaultMaxPerRoute((Integer) asyncHttpClientMap.get(MAX_CONNECTION_PER_ROUTE));
// Now handle all the specific route defined.
Map<String, Object> routeMap = (Map<String, Object>)asyncHttpClientMap.get(ROUTES);
Iterator<String> it = routeMap.keySet().iterator();
while (it.hasNext()) {
String route = it.next();
Integer maxConnection = (Integer)routeMap.get(route);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
route)), maxConnection);
}
RequestConfig config = RequestConfig.custom()
.setConnectTimeout((Integer) asyncHttpClientMap.get(TIMEOUT_MILLISECONDS))
.build();
return HttpAsyncClientBuilder
.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(config)
.build();
}
public void initialize(final NettyRpcProperties properties) throws Exception {
final CommonProperties cp = properties.getCommonProperties();
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CommonProperties.CPUS * 2)
.setConnectTimeout(Integer.parseInt(cp.getHttpConnectTimeout()))
.setSoTimeout(Integer.parseInt(cp.getHttpSocketTimeout()))
.setSndBufSize(Integer.parseInt(cp.getHttpSendBufSize()))
.setRcvBufSize(Integer.parseInt(cp.getHttpRcvBufSize()))
.setBacklogSize(Integer.parseInt(cp.getHttpBackLogSize()))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(cp.getHttpMaxTotal()));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
/***
* 创建httpclient
*
* @param config
* 配置文件
* @return
* @throws IOReactorException
*/
public static HttpClient createHttpClient(OpenTSDBConfig config) throws IOReactorException {
Objects.requireNonNull(config);
ConnectingIOReactor ioReactor = initIOReactorConfig();
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
RequestConfig requestConfig = initRequestConfig(config);
CloseableHttpAsyncClient httpAsyncClient = createPoolingHttpClient(requestConfig, connManager, config);
return new HttpClient(config, httpAsyncClient, initFixedCycleCloseConnection(connManager));
}
/***
* 创建client
*
* @param config
* 查询对象
* @param cm
* 连接池管理
* @param openTSDBConfig
* @return
*/
private static CloseableHttpAsyncClient createPoolingHttpClient(RequestConfig config, PoolingNHttpClientConnectionManager cm,
OpenTSDBConfig openTSDBConfig) {
cm.setMaxTotal(100);
cm.setDefaultMaxPerRoute(100);
HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom().setConnectionManager(cm)
.setDefaultRequestConfig(config);
// 如果不是只读,则设置为长连接
if (!openTSDBConfig.isReadonly()) {
httpAsyncClientBuilder.setKeepAliveStrategy(myStrategy());
}
CloseableHttpAsyncClient client = httpAsyncClientBuilder.build();
return client;
}
public void initialize() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CPUS * 2)
.setConnectTimeout(Integer.parseInt(HTTPCLIENT_CONNCT_TIMEOUT_DEFAULT))
.setSoTimeout(Integer.parseInt(HTTPCLIENT_SOCKET_TIMEOUT_DEFAULT))
.setSndBufSize(Integer.parseInt(HTTPCLIENT_SEDBUFSIZE_DEFAULT))
.setRcvBufSize(Integer.parseInt(HTTPCLIENT_RCV_BUFSIZE_DEFAULT))
.setBacklogSize(Integer.parseInt(HTTPCLIENT_BACK_LOG_SIZE_DEFAULT))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(HTTPCLIENT_MAX_TOTAL_DEFAULT));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
isStarted=true;
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
private static ScheduledExecutorService initFixedCycleCloseConnection(final PoolingNHttpClientConnectionManager cm) {
// 定时关闭所有空闲链接
ScheduledExecutorService connectionGcService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Fixed-Cycle-Close-Connection-" + NUM.incrementAndGet());
t.setDaemon(true);
return t;
}
}
);
connectionGcService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Close idle connections, fixed cycle operation");
}
cm.closeIdleConnections(3, TimeUnit.MINUTES);
} catch (Exception ex) {
LOGGER.error("", ex);
}
}
}, 30, 30, TimeUnit.SECONDS);
return connectionGcService;
}
public AsyncInternalClient(Config config){
this.config = config;
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnectCount());
cm.setDefaultMaxPerRoute(config.getMaxPerRoute());
httpClient = createHttpAsyncClient(config, cm);
httpClient.start();
} catch (IOReactorException e) {
throw new ClientException(e);
}
}
private CloseableHttpAsyncClient createHttpAsyncClient(Config config, PoolingNHttpClientConnectionManager cm){
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom();
httpClientBuilder.setConnectionManager(cm);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.getConnectTimeoutMillis())
.setConnectionRequestTimeout(config.getConnectTimeoutMillis())
.setSocketTimeout(config.getReadTimeoutMillis())
.build();
httpClientBuilder.setDefaultRequestConfig(requestConfig);
httpClientBuilder.setUserAgent(config.getUserAgent());
httpClientBuilder.disableCookieManagement();
return httpClientBuilder.build();
}
public static CloseableHttpAsyncClient getCloseableHttpAsyncClient() throws IOReactorException {
if (closeableHttpAsyncClient == null) {
synchronized (WxMpTemplateMsgSender.class) {
if (closeableHttpAsyncClient == null) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(-1)
.setSocketTimeout(-1)
.setConnectionRequestTimeout(-1)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true).setConnectTimeout(-1).setSoTimeout(-1)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
//最大连接数
connManager.setMaxTotal(5000);
//per route最大连接数
connManager.setDefaultMaxPerRoute(5000);
closeableHttpAsyncClient = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
closeableHttpAsyncClient.start();
}
}
}
return closeableHttpAsyncClient;
}
PoolingNHttpClientConnectionManager createUnconfiguredPoolingNHttpClientConnectionManager() {
try {
return new PoolingNHttpClientConnectionManager(createIOReactor(), createSchemeIOSessionStrategyRegistry());
} catch (IOReactorException e) {
throw new IllegalStateException(e);
}
}
PoolingNHttpClientConnectionManager createUnconfiguredPoolingNHttpClientConnectionManager() {
try {
return new PoolingNHttpClientConnectionManager(createIOReactor(), createSchemeIOSessionStrategyRegistry());
} catch (IOReactorException e) {
throw new IllegalStateException(e);
}
}
public AsyncServiceClient(ClientConfiguration config) {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(config.getIoThreadCount()).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
ioReactorConfig);
PoolingNHttpClientConnectionManager cm =
new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnections());
cm.setDefaultMaxPerRoute(config.getMaxConnections());
httpClient = HttpFactory.createHttpAsyncClient(config, cm);
/*
* socketTimeout的值限制了closeIdleConnections执行的周期。
* 如果周期相对socketTimeout的值过长,有可能一个请求分配到一个即将socketTimeout的连接,
* 在请求发送之前即抛出SocketTimeoutException。
* 现在让closeIdleConnections的执行周期为socketTimeout / 2.5。
*/
long closePeriod = 5000;
if (config.getSocketTimeoutInMillisecond() > 0) {
closePeriod = (long) (config.getSocketTimeoutInMillisecond() / 2.5);
}
closePeriod = closePeriod < 5000 ? closePeriod : 5000;
connEvictor = new IdleConnectionEvictor(cm, closePeriod);
httpClient.start();
connEvictor.start();
} catch (IOReactorException ex) {
throw new ClientException(String.format("IOReactorError: %s",
ex.getMessage()), ex);
}
}
public static CloseableHttpAsyncClient createHttpAsyncClient(
ClientConfiguration config, PoolingNHttpClientConnectionManager cm) {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom();
httpClientBuilder.setConnectionManager(cm);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.getConnectionTimeoutInMillisecond())
.setSocketTimeout(config.getSocketTimeoutInMillisecond()).build();
httpClientBuilder.setDefaultRequestConfig(requestConfig);
httpClientBuilder.setUserAgent(Constants.USER_AGENT);
httpClientBuilder.disableCookieManagement();
String proxyHost = config.getProxyHost();
int proxyPort = config.getProxyPort();
if (proxyHost != null) {
if (proxyPort <= 0) {
throw new ClientException("The proxy port is invalid. Please check your configuration.");
}
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
httpClientBuilder.setProxy(proxy);
String proxyUsername = config.getProxyUsername();
String proxyPassword = config.getProxyPassword();
if (proxyUsername != null && proxyPassword != null) {
String proxyDomain = config.getProxyDomain();
String proxyWorkstation = config.getProxyWorkstation();
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(
new AuthScope(proxyHost, proxyPort),
new NTCredentials(
proxyUsername, proxyPassword, proxyWorkstation, proxyDomain));
httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
}
}
return httpClientBuilder.build();
}
public void initialize(final ThunderProperties properties) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(ThunderConstant.CPUS)
.setConnectTimeout(properties.getInteger(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME))
.setSoTimeout(properties.getInteger(ThunderConstant.APACHE_SO_TIMEOUT_ATTRIBUTE_NAME))
.setSndBufSize(properties.getInteger(ThunderConstant.APACHE_SNDBUF_SIZE_ATTRIBUTE_NAME))
.setRcvBufSize(properties.getInteger(ThunderConstant.APACHE_RCVBUF_SIZE_ATTRIBUTE_NAME))
.setBacklogSize(properties.getInteger(ThunderConstant.APACHE_BACKLOG_SIZE_ATTRIBUTE_NAME))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(ThunderConstant.CPUS * properties.getInteger(ThunderConstant.APACHE_MAX_TOTAL_ATTRIBUTE_NAME));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await(properties.getLong(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME) * 2, TimeUnit.MILLISECONDS);
}
/**
* Create connection manager for asynchronous http client.
*
* @return Connection manager for asynchronous http client.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
protected NHttpClientConnectionManager createNHttpClientConnectionManager() throws IOReactorException {
ConnectingIOReactor ioReactor =
new DefaultConnectingIOReactor(IOReactorConfig.custom()
.setSoTimeout(this.config.getSocketTimeoutInMillis()).setTcpNoDelay(true).build());
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(ioReactor);
connectionManager.setDefaultMaxPerRoute(this.config.getMaxConnections());
connectionManager.setMaxTotal(this.config.getMaxConnections());
return connectionManager;
}
public static CloseableHttpAsyncClient createHttpAsyncClient(
PoolingNHttpClientConnectionManager connManager,
ClientConfiguration config) {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom()
.setConnectionManager(connManager);
// Set proxy if set.
String proxyHost = config.getProxyHost();
int proxyPort = config.getProxyPort();
if (proxyHost != null && proxyPort > 0) {
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
httpClientBuilder.setProxy(proxy);
String proxyUsername = config.getProxyUsername();
String proxyPassword = config.getProxyPassword();
if (proxyUsername != null && proxyPassword != null) {
String proxyDomain = config.getProxyDomain();
String proxyWorkstation = config.getProxyWorkstation();
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(proxy),
new NTCredentials(proxyUsername, proxyPassword,
proxyWorkstation, proxyDomain)
);
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
}
RequestConfig defaultRequestConfig = RequestConfig
.custom()
.setCookieSpec(CookieSpecs.BEST_MATCH)
.setExpectContinueEnabled(true)
.setStaleConnectionCheckEnabled(true)
.setTargetPreferredAuthSchemes(
Arrays.asList(AuthSchemes.NTLM, AuthSchemes.DIGEST))
.setProxyPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC))
.setConnectTimeout(config.getConnectionTimeout())
.setSocketTimeout(config.getSocketTimeout())
.setExpectContinueEnabled(config.isExceptContinue()).build();
httpClientBuilder.setDefaultRequestConfig(defaultRequestConfig);
httpClientBuilder
.setMaxConnPerRoute(config.getMaxConnectionsPerRoute());
httpClientBuilder.setMaxConnTotal(config.getMaxConnections());
httpClientBuilder.setUserAgent(VersionInfoUtils.getDefaultUserAgent());
CloseableHttpAsyncClient httpclient = httpClientBuilder.build();
return httpclient;
}
public static CloseableHttpAsyncClient createHttpAsyncClient(
PoolingNHttpClientConnectionManager connManager,
ClientConfiguration config) {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClients.custom()
.setConnectionManager(connManager);
// Set proxy if set.
String proxyHost = config.getProxyHost();
int proxyPort = config.getProxyPort();
if (proxyHost != null && proxyPort > 0) {
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
httpClientBuilder.setProxy(proxy);
String proxyUsername = config.getProxyUsername();
String proxyPassword = config.getProxyPassword();
if (proxyUsername != null && proxyPassword != null) {
String proxyDomain = config.getProxyDomain();
String proxyWorkstation = config.getProxyWorkstation();
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(proxy),
new NTCredentials(proxyUsername, proxyPassword,
proxyWorkstation, proxyDomain)
);
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
}
RequestConfig defaultRequestConfig = RequestConfig
.custom()
.setCookieSpec(CookieSpecs.BEST_MATCH)
.setExpectContinueEnabled(true)
.setStaleConnectionCheckEnabled(true)
.setTargetPreferredAuthSchemes(
Arrays.asList(AuthSchemes.NTLM, AuthSchemes.DIGEST))
.setProxyPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC))
.setConnectTimeout(config.getConnectionTimeout())
.setSocketTimeout(config.getSocketTimeout())
.setExpectContinueEnabled(config.isExceptContinue()).build();
httpClientBuilder.setDefaultRequestConfig(defaultRequestConfig);
httpClientBuilder
.setMaxConnPerRoute(config.getMaxConnectionsPerRoute());
httpClientBuilder.setMaxConnTotal(config.getMaxConnections());
httpClientBuilder.setUserAgent(VersionInfoUtils.getDefaultUserAgent());
CloseableHttpAsyncClient httpclient = httpClientBuilder.build();
return httpclient;
}
public HttpAsyncClientFactory(PoolingNHttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
initHttpClient();
}
private static CloseableHttpAsyncClient createPoolingHttpClient(
Config config, PoolingNHttpClientConnectionManager cm) throws HttpClientInitException {
int httpConnectionPool = config.getHttpConnectionPool();
int httpConnectionLiveTime = config.getHttpConnectionLiveTime();
int httpKeepaliveTime = config.getHttpKeepaliveTime();
RequestConfig requestConfig = initRequestConfig(config);
if (httpConnectionPool > 0) {
cm.setMaxTotal(httpConnectionPool);
cm.setDefaultMaxPerRoute(httpConnectionPool);
cm.closeExpiredConnections();
}
HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom();
// 设置连接管理器
httpAsyncClientBuilder.setConnectionManager(cm);
// 设置RequestConfig
if (requestConfig != null) {
httpAsyncClientBuilder.setDefaultRequestConfig(requestConfig);
}
// 设置Keepalive
if (httpKeepaliveTime > 0) {
HiTSDBConnectionKeepAliveStrategy hiTSDBConnectionKeepAliveStrategy = new HiTSDBConnectionKeepAliveStrategy(httpConnectionLiveTime);
httpAsyncClientBuilder.setKeepAliveStrategy(hiTSDBConnectionKeepAliveStrategy);
} else if (httpKeepaliveTime == 0) {
HiTSDBConnectionReuseStrategy hiTSDBConnectionReuseStrategy = new HiTSDBConnectionReuseStrategy();
httpAsyncClientBuilder.setConnectionReuseStrategy(hiTSDBConnectionReuseStrategy);
}
// 设置连接自动关闭
if (httpConnectionLiveTime > 0) {
TSDBHttpAsyncCallbackExecutor httpAsyncCallbackExecutor = new TSDBHttpAsyncCallbackExecutor(httpConnectionLiveTime);
httpAsyncClientBuilder.setEventHandler(httpAsyncCallbackExecutor);
}
CloseableHttpAsyncClient client = httpAsyncClientBuilder.build();
return client;
}
public IdleNConnectionMonitorThread(PoolingNHttpClientConnectionManager manager) {
super(manager);
}