下面列出了怎么用org.apache.http.impl.nio.reactor.IOReactorConfig的API类实例代码及写法,或者点击链接到github查看源代码。
private static IOReactorConfig getDefaultReactorConfig() {
IOReactorConfig.Builder builder = IOReactorConfig.custom();
return builder.setSelectInterval(
HL7Configuration.getInstance().getIntProperty(MLLPConstants.TCPConstants.SELECT_INTERVAL, 1000))
.setShutdownGracePeriod(HL7Configuration.getInstance()
.getIntProperty(MLLPConstants.TCPConstants.SHUTDOWN_GRACE_PERIOD, 500))
.setIoThreadCount(HL7Configuration.getInstance()
.getIntProperty(MLLPConstants.TCPConstants.IO_THREAD_COUNT,
Runtime.getRuntime().availableProcessors()))
.setSoTimeout(HL7Configuration.getInstance().getIntProperty(MLLPConstants.TCPConstants.SO_TIMEOUT, 0))
.setSoKeepAlive(HL7Configuration.getInstance()
.getBooleanProperty(MLLPConstants.TCPConstants.SO_KEEP_ALIVE, true))
.setTcpNoDelay(HL7Configuration.getInstance()
.getBooleanProperty(MLLPConstants.TCPConstants.TCP_NO_DELAY, true))
.setConnectTimeout(
HL7Configuration.getInstance().getIntProperty(MLLPConstants.TCPConstants.CONNECT_TIMEOUT, 0))
.setRcvBufSize(HL7Configuration.getInstance().getIntProperty(MLLPConstants.TCPConstants.SO_RCVBUF, 0))
.setSndBufSize(HL7Configuration.getInstance().getIntProperty(MLLPConstants.TCPConstants.SO_SNDBUF, 0))
.setInterestOpQueued(false).setSoReuseAddress(true).setSoLinger(-1).build();
}
public HttpAsyncClientFactory() {
try {
int ioThreadCount = Runtime.getRuntime().availableProcessors() * 8;
IOReactorConfig ioReactorConfig =
IOReactorConfig.custom().setIoThreadCount(ioThreadCount)
.setConnectTimeout(httpConfig.getConnectTimeout()).setSoTimeout(httpConfig.getReadTimeout()).build();
ConnectingIOReactor ioReactor =
new DefaultConnectingIOReactor(ioReactorConfig, new NamedThreadFactory("flower-http"));
this.connectionManager = new PoolingNHttpClientConnectionManager(ioReactor);
this.connectionManager.setMaxTotal(1024);
this.connectionManager.setDefaultMaxPerRoute(256);
initHttpClient();
} catch (Exception e) {
logger.error("", e);
}
}
private CloseableHttpAsyncClient createHttpAsyncClient(YunpianConf conf) throws IOReactorException {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(conf.getConfInt(YunpianConf.HTTP_CONN_TIMEOUT, "10000"))
.setSoTimeout(conf.getConfInt(YunpianConf.HTTP_SO_TIMEOUT, "30000")).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
ConnectionConfig connectionConfig = ConnectionConfig.custom().setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Charset.forName(conf.getConf(YunpianConf.HTTP_CHARSET, YunpianConf.HTTP_CHARSET_DEFAULT))).build();
connManager.setDefaultConnectionConfig(connectionConfig);
connManager.setMaxTotal(conf.getConfInt(YunpianConf.HTTP_CONN_MAXTOTAL, "100"));
connManager.setDefaultMaxPerRoute(conf.getConfInt(YunpianConf.HTTP_CONN_MAXPERROUTE, "10"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(connManager).build();
httpclient.start();
return httpclient;
}
public FiberApacheHttpClientRequestExecutor(final Validator<CloseableHttpResponse> resValidator, final int maxConnections, final int timeout, final int parallelism) throws IOReactorException {
final DefaultConnectingIOReactor ioreactor = new DefaultConnectingIOReactor(IOReactorConfig.custom().
setConnectTimeout(timeout).
setIoThreadCount(parallelism).
setSoTimeout(timeout).
build());
final PoolingNHttpClientConnectionManager mngr = new PoolingNHttpClientConnectionManager(ioreactor);
mngr.setDefaultMaxPerRoute(maxConnections);
mngr.setMaxTotal(maxConnections);
final CloseableHttpAsyncClient ahc = HttpAsyncClientBuilder.create().
setConnectionManager(mngr).
setDefaultRequestConfig(RequestConfig.custom().setLocalAddress(null).build()).build();
client = new FiberHttpClient(ahc);
validator = resValidator;
}
protected CloseableHttpAsyncClient buildDefaultHttpAsyncClient() {
HttpHost httpProxy = resolveProxy(proxy);
return HttpAsyncClientBuilder.create()
.setMaxConnTotal(maxConnections)
.setMaxConnPerRoute(maxConnections)
.setProxy(httpProxy)
.setDefaultIOReactorConfig(IOReactorConfig.custom()
.setConnectTimeout(connectTimeout)
.setSoTimeout(readTimeout)
.build())
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(readTimeout)
.setProxy(httpProxy)
.build())
.setDefaultHeaders(Arrays.asList(new BasicHeader("Accept-Encoding", "gzip")))
.build();
}
public static void start() throws IOException {
if (reactor != null && reactor.getStatus().equals(IOReactorStatus.ACTIVE)) {
return;
}
IOReactorConfig config = getDefaultReactorConfig();
reactor = new DefaultListeningIOReactor(config);
Thread reactorThread = new Thread(new Runnable() {
@Override
public void run() {
try {
isStarted = true;
multiIOHandler = new MultiIOHandler(processorMap);
log.info("MLLP Transport IO Reactor Started");
reactor.execute(multiIOHandler);
} catch (IOException e) {
isStarted = false;
log.error("Error while starting the MLLP Transport IO Reactor.", e);
}
}
});
reactorThread.start();
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public SentinelApiClient() {
IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
httpClient.start();
}
public void initialize(final NettyRpcProperties properties) throws Exception {
final CommonProperties cp = properties.getCommonProperties();
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CommonProperties.CPUS * 2)
.setConnectTimeout(Integer.parseInt(cp.getHttpConnectTimeout()))
.setSoTimeout(Integer.parseInt(cp.getHttpSocketTimeout()))
.setSndBufSize(Integer.parseInt(cp.getHttpSendBufSize()))
.setRcvBufSize(Integer.parseInt(cp.getHttpRcvBufSize()))
.setBacklogSize(Integer.parseInt(cp.getHttpBackLogSize()))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(cp.getHttpMaxTotal()));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
public MetricFetcher() {
int cores = Runtime.getRuntime().availableProcessors() * 2;
long keepAliveTime = 0;
int queueSize = 2048;
RejectedExecutionHandler handler = new DiscardPolicy();
fetchService = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
fetchWorker = new ThreadPoolExecutor(cores, cores,
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
IOReactorConfig ioConfig = IOReactorConfig.custom()
.setConnectTimeout(3000)
.setSoTimeout(3000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2)
.build();
httpclient = HttpAsyncClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000)
.setMaxConnPerRoute(1000)
.setDefaultIOReactorConfig(ioConfig)
.build();
httpclient.start();
start();
}
public SentinelApiClient() {
IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000)
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(final String method) {
return false;
}
}).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
httpClient.start();
}
public void initialize() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CPUS * 2)
.setConnectTimeout(Integer.parseInt(HTTPCLIENT_CONNCT_TIMEOUT_DEFAULT))
.setSoTimeout(Integer.parseInt(HTTPCLIENT_SOCKET_TIMEOUT_DEFAULT))
.setSndBufSize(Integer.parseInt(HTTPCLIENT_SEDBUFSIZE_DEFAULT))
.setRcvBufSize(Integer.parseInt(HTTPCLIENT_RCV_BUFSIZE_DEFAULT))
.setBacklogSize(Integer.parseInt(HTTPCLIENT_BACK_LOG_SIZE_DEFAULT))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(HTTPCLIENT_MAX_TOTAL_DEFAULT));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
isStarted=true;
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
public AsyncInternalClient(Config config){
this.config = config;
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnectCount());
cm.setDefaultMaxPerRoute(config.getMaxPerRoute());
httpClient = createHttpAsyncClient(config, cm);
httpClient.start();
} catch (IOReactorException e) {
throw new ClientException(e);
}
}
protected static ConnectingIOReactor createConnectingIOReactor(int ioThreadCount) {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(ioThreadCount).setTcpNoDelay(true).build();
try {
return new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException ex) {
throw new IOReactorRuntimeException(ex);
}
}
@Autowired
public EsRestClient(JkesSearchProperties jkesProperties) {
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
Header[] defaultHeaders = {new BasicHeader("Content-Type", "application/json")};
String[] urls = jkesProperties.getEs().getServers().split("\\s*,");
HttpHost[] hosts = new HttpHost[urls.length];
for (int i = 0; i < urls.length; i++) {
hosts[i] = HttpHost.create(urls[i]);
}
RestClient restClient = RestClient.builder(hosts)
.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder.setConnectTimeout(5000) // default 1s
.setSocketTimeout(60000); // defaults to 30 seconds
}).setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom().setIoThreadCount(2).build()); // because only used for admin, so not necessary to hold many worker threads
})
.setMaxRetryTimeoutMillis(60000) // defaults to 30 seconds
.setDefaultHeaders(defaultHeaders)
.setFailureListener(sniffOnFailureListener)
.build();
Sniffer sniffer = Sniffer.builder(restClient).build();
sniffOnFailureListener.setSniffer(sniffer);
this.sniffer = sniffer;
this.restClient = restClient;
}
@Inject
public EsRestClient(JkesProperties jkesProperties) {
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
Header[] defaultHeaders = {new BasicHeader("Content-Type", "application/json")};
String[] urls = jkesProperties.getEsBootstrapServers().split("\\s*,");
HttpHost[] hosts = new HttpHost[urls.length];
for (int i = 0; i < urls.length; i++) {
hosts[i] = HttpHost.create(urls[i]);
}
RestClient restClient = RestClient.builder(hosts)
.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder.setConnectTimeout(5000) // default 1s
.setSocketTimeout(60000); // defaults to 30 seconds
}).setHttpClientConfigCallback(httpClientBuilder -> {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom().setIoThreadCount(2).build()); // because only used for admin, so not necessary to hold many worker threads
})
.setMaxRetryTimeoutMillis(60000) // defaults to 30 seconds
.setDefaultHeaders(defaultHeaders)
.setFailureListener(sniffOnFailureListener)
.build();
Sniffer sniffer = Sniffer.builder(restClient).build();
sniffOnFailureListener.setSniffer(sniffer);
this.sniffer = sniffer;
this.restClient = restClient;
}
public static CloseableHttpAsyncClient getCloseableHttpAsyncClient() throws IOReactorException {
if (closeableHttpAsyncClient == null) {
synchronized (WxMpTemplateMsgSender.class) {
if (closeableHttpAsyncClient == null) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(-1)
.setSocketTimeout(-1)
.setConnectionRequestTimeout(-1)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true).setConnectTimeout(-1).setSoTimeout(-1)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
//最大连接数
connManager.setMaxTotal(5000);
//per route最大连接数
connManager.setDefaultMaxPerRoute(5000);
closeableHttpAsyncClient = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
closeableHttpAsyncClient.start();
}
}
}
return closeableHttpAsyncClient;
}
IOReactorConfig createIoReactorConfig() {
HttpClientConfig httpClientConfig = wrappedHttpClientConfig.getHttpClientConfig();
return IOReactorConfig.custom()
.setConnectTimeout(httpClientConfig.getConnTimeout())
.setSoTimeout(httpClientConfig.getReadTimeout())
.setIoThreadCount(wrappedHttpClientConfig.getIoThreadCount())
.build();
}
IOReactorConfig createIOReactorConfig() {
return IOReactorConfig.custom()
.setConnectTimeout(connTimeout)
.setSoTimeout(readTimeout)
.setIoThreadCount(ioThreadCount)
.build();
}
@Override
protected void start() throws IOException {
if (!portFree()) {
getLogger().warning("Failed to initialize " + getName() + ". Port " + port + " is not free.");
throw new IOException("Port " + port + " is not free.");
}
handlers = createHandlers();
taskActivationConstructorManager.start();
manuallyBuildActionConstructorManager.start();
setMainBackEndHolder(backEndHolder);
ServerBootstrap serverBootstrap = ServerBootstrap.bootstrap()
.setLocalAddress(InetAddress.getByName("localhost"))
.setIOReactorConfig(IOReactorConfig.custom().setSoReuseAddress(true).build())
.setListenerPort(port)
.setServerInfo("Repeat")
.setExceptionLogger(ExceptionLogger.STD_ERR)
.registerHandler("/test", new UpAndRunningHandler())
.registerHandler("/static/*", new StaticFileServingHandler(BootStrapResources.getWebUIResource().getStaticDir().getAbsolutePath()));
for (Entry<String, HttpHandlerWithBackend> entry : handlers.entrySet()) {
serverBootstrap.registerHandler(entry.getKey(), entry.getValue());
}
server = serverBootstrap.create();
mainThread = new Thread() {
@Override
public void run() {
try {
server.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
getLogger().log(Level.SEVERE, "Interrupted when waiting for UI server.", e);
}
getLogger().info("Finished waiting for UI server termination...");
}
};
server.start();
mainThread.start();
getLogger().info("UI server up and running...");
}
public AsyncServiceClient(ClientConfiguration config) {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(config.getIoThreadCount()).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
ioReactorConfig);
PoolingNHttpClientConnectionManager cm =
new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnections());
cm.setDefaultMaxPerRoute(config.getMaxConnections());
httpClient = HttpFactory.createHttpAsyncClient(config, cm);
/*
* socketTimeout的值限制了closeIdleConnections执行的周期。
* 如果周期相对socketTimeout的值过长,有可能一个请求分配到一个即将socketTimeout的连接,
* 在请求发送之前即抛出SocketTimeoutException。
* 现在让closeIdleConnections的执行周期为socketTimeout / 2.5。
*/
long closePeriod = 5000;
if (config.getSocketTimeoutInMillisecond() > 0) {
closePeriod = (long) (config.getSocketTimeoutInMillisecond() / 2.5);
}
closePeriod = closePeriod < 5000 ? closePeriod : 5000;
connEvictor = new IdleConnectionEvictor(cm, closePeriod);
httpClient.start();
connEvictor.start();
} catch (IOReactorException ex) {
throw new ClientException(String.format("IOReactorError: %s",
ex.getMessage()), ex);
}
}
public void initialize(final ThunderProperties properties) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(ThunderConstant.CPUS)
.setConnectTimeout(properties.getInteger(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME))
.setSoTimeout(properties.getInteger(ThunderConstant.APACHE_SO_TIMEOUT_ATTRIBUTE_NAME))
.setSndBufSize(properties.getInteger(ThunderConstant.APACHE_SNDBUF_SIZE_ATTRIBUTE_NAME))
.setRcvBufSize(properties.getInteger(ThunderConstant.APACHE_RCVBUF_SIZE_ATTRIBUTE_NAME))
.setBacklogSize(properties.getInteger(ThunderConstant.APACHE_BACKLOG_SIZE_ATTRIBUTE_NAME))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(ThunderConstant.CPUS * properties.getInteger(ThunderConstant.APACHE_MAX_TOTAL_ATTRIBUTE_NAME));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await(properties.getLong(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME) * 2, TimeUnit.MILLISECONDS);
}
/**
* Creates config with setting for num connection threads. Default is ES client default,
* which is 1 to num processors per the documentation.
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html
*/
private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig esClientConfig) {
if (esClientConfig.getNumClientConnectionThreads().isPresent()) {
Integer numThreads = esClientConfig.getNumClientConnectionThreads().get();
LOG.info("Setting number of client connection threads: {}", numThreads);
return IOReactorConfig.custom().setIoThreadCount(numThreads).build();
} else {
return IOReactorConfig.DEFAULT;
}
}
/**
* Create connection manager for asynchronous http client.
*
* @return Connection manager for asynchronous http client.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
protected NHttpClientConnectionManager createNHttpClientConnectionManager() throws IOReactorException {
ConnectingIOReactor ioReactor =
new DefaultConnectingIOReactor(IOReactorConfig.custom()
.setSoTimeout(this.config.getSocketTimeoutInMillis()).setTcpNoDelay(true).build());
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(ioReactor);
connectionManager.setDefaultMaxPerRoute(this.config.getMaxConnections());
connectionManager.setMaxTotal(this.config.getMaxConnections());
return connectionManager;
}
private ConnectingIOReactor ioReactor() throws Exception {
// Create I/O reactor configuration
Map<String, Object> asyncRestTemplateMap = (Map)configMap.get(ASYNC_REST_TEMPLATE);
Map<String, Object> reactorMap = (Map)asyncRestTemplateMap.get(REACTOR);
Integer ioThreadCount = (Integer)reactorMap.get(REACTOR_IO_THREAD_COUNT);
IOReactorConfig.Builder builder = IOReactorConfig.custom();
builder.setIoThreadCount(ioThreadCount == null? Runtime.getRuntime().availableProcessors(): ioThreadCount);
Integer connectTimeout = (Integer)reactorMap.get(REACTOR_CONNECT_TIMEOUT);
builder.setConnectTimeout(connectTimeout == null? DEFAULT_REACTOR_CONNECT_TIMEOUT: connectTimeout);
Integer soTimeout = (Integer)reactorMap.get(REACTOR_SO_TIMEOUT);
builder.setSoTimeout(soTimeout == null? DEFAULT_REACTOR_SO_TIMEOUT: soTimeout);
IOReactorConfig ioReactorConfig = builder.build();
return new DefaultConnectingIOReactor(ioReactorConfig);
}
private void initialize() {
if (initialized.getAndSet(true)) {
return;
}
IOReactorConfig.Builder config = createConfig();
// params.setParameter(CoreProtocolPNames.USER_AGENT, "jsonrpc4j/1.0");
final ConnectingIOReactor ioReactor = createIoReactor(config);
createSslContext();
int socketBufferSize = Integer.getInteger("com.googlecode.jsonrpc4j.async.socket.buffer", 8 * 1024);
final ConnectionConfig connectionConfig = ConnectionConfig.custom().setBufferSize(socketBufferSize).build();
BasicNIOConnFactory nioConnFactory = new BasicNIOConnFactory(sslContext, null, connectionConfig);
pool = new BasicNIOConnPool(ioReactor, nioConnFactory, Integer.getInteger("com.googlecode.jsonrpc4j.async.connect.timeout", 30000));
pool.setDefaultMaxPerRoute(Integer.getInteger("com.googlecode.jsonrpc4j.async.max.inflight.route", 500));
pool.setMaxTotal(Integer.getInteger("com.googlecode.jsonrpc4j.async.max.inflight.total", 500));
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, sslContext, connectionConfig);
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
}
}, "jsonrpc4j HTTP IOReactor");
t.setDaemon(true);
t.start();
HttpProcessor httpProcessor = new ImmutableHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue(false));
requester = new HttpAsyncRequester(httpProcessor, new DefaultConnectionReuseStrategy());
}
private IOReactorConfig.Builder createConfig() {
IOReactorConfig.Builder config = IOReactorConfig.custom();
config = config.setSoTimeout(Integer.getInteger("com.googlecode.jsonrpc4j.async.socket.timeout", 30000));
config = config.setConnectTimeout(Integer.getInteger("com.googlecode.jsonrpc4j.async.connect.timeout", 30000));
config = config.setTcpNoDelay(Boolean.valueOf(System.getProperty("com.googlecode.jsonrpc4j.async.tcp.nodelay", "true")));
config = config.setIoThreadCount(Integer.getInteger("com.googlecode.jsonrpc4j.async.reactor.threads", 1));
return config;
}
private NHttpFileServer start() throws KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException,
KeyStoreException, CertificateException, IOException, InterruptedException {
SSLContext sslContext = null;
if (port == 8443) {
// Initialize SSL context
final URL url = NHttpFileServer.class.getResource("/test.keystore");
if (url == null) {
debug("Keystore not found");
System.exit(1);
}
debug("Loading keystore " + url);
sslContext = SSLContexts.custom()
.loadKeyMaterial(url, "nopassword".toCharArray(), "nopassword".toCharArray()).build();
}
final IOReactorConfig config = IOReactorConfig.custom().setSoTimeout(15000).setTcpNoDelay(true).build();
// @formatter:off
server = ServerBootstrap.bootstrap()
.setListenerPort(port)
.setServerInfo("Test/1.1")
.setIOReactorConfig(config)
.setSslContext(sslContext)
.setExceptionLogger(ExceptionLogger.STD_ERR)
.registerHandler("*", new HttpFileHandler(docRoot)).create();
// @formatter:on
server.start();
debug("Serving " + docRoot + " on " + server.getEndpoint().getAddress()
+ (sslContext == null ? "" : " with " + sslContext.getProvider() + " " + sslContext.getProtocol()));
server.getEndpoint().waitFor();
// Thread.sleep(startWaitMillis); // hack
return this;
}
private static RestHighLevelClient createClient(
ElasticsearchConfig config,
Optional<AwsSecurityConfig> awsSecurityConfig,
Optional<PasswordConfig> passwordConfig)
{
RestClientBuilder builder = RestClient.builder(
new HttpHost(config.getHost(), config.getPort(), config.isTlsEnabled() ? "https" : "http"))
.setMaxRetryTimeoutMillis(toIntExact(config.getMaxRetryTime().toMillis()));
builder.setHttpClientConfigCallback(ignored -> {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(toIntExact(config.getConnectTimeout().toMillis()))
.setSocketTimeout(toIntExact(config.getRequestTimeout().toMillis()))
.build();
IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setIoThreadCount(config.getHttpThreadCount())
.build();
// the client builder passed to the call-back is configured to use system properties, which makes it
// impossible to configure concurrency settings, so we need to build a new one from scratch
HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(requestConfig)
.setDefaultIOReactorConfig(reactorConfig)
.setMaxConnPerRoute(config.getMaxHttpConnections())
.setMaxConnTotal(config.getMaxHttpConnections());
if (config.isTlsEnabled()) {
buildSslContext(config.getKeystorePath(), config.getKeystorePassword(), config.getTrustStorePath(), config.getTruststorePassword())
.ifPresent(clientBuilder::setSSLContext);
if (config.isVerifyHostnames()) {
clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
}
passwordConfig.ifPresent(securityConfig -> {
CredentialsProvider credentials = new BasicCredentialsProvider();
credentials.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword()));
clientBuilder.setDefaultCredentialsProvider(credentials);
});
awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addInterceptorLast(new AwsRequestSigner(
securityConfig.getRegion(),
getAwsCredentialsProvider(securityConfig))));
return clientBuilder;
});
return new RestHighLevelClient(builder);
}
@Test
public void ioReactorConfigUsesGivenIoThreadCount() {
// given
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder();
int expectedIoThreadCount = random.nextInt(16) + 1;
builder.ioThreadCount(expectedIoThreadCount);
ExtendedJestClientFactory factory = new ExtendedJestClientFactory(builder.build());
// when
IOReactorConfig ioReactorConfig = factory.createIoReactorConfig();
// then
assertEquals(expectedIoThreadCount, ioReactorConfig.getIoThreadCount());
}