下面列出了怎么用org.apache.http.nio.reactor.IOReactorException的API类实例代码及写法,或者点击链接到github查看源代码。
public OpenTSDBClient(OpenTSDBConfig config) throws IOReactorException {
this.config = config;
this.httpClient = HttpClientFactory.createHttpClient(config);
this.httpClient.start();
if (!config.isReadonly()) {
this.queue = new ArrayBlockingQueue<>(config.getBatchPutBufferSize());
this.producer = new ProducerImpl(queue);
this.consumer = new ConsumerImpl(queue, httpClient, config);
this.consumer.start();
try {
queryDeleteField = Query.class.getDeclaredField("delete");
queryDeleteField.setAccessible(true);
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
log.debug("the httpclient has started");
}
@Test
public void unconfiguredNHttpConnectionManagerInitRethrowsISEOnIOReactorException() throws IOReactorException {
// given
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder();
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
String expectedMessage = UUID.randomUUID().toString();
when(factory.createIOReactor()).thenThrow(new IOReactorException(expectedMessage));
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(expectedMessage);
// when
factory.createUnconfiguredPoolingNHttpClientConnectionManager();
// then
verify(factory).createSchemeIOSessionStrategyRegistry();
}
@Test
public void throwsIllegalStateOnReactorException() throws IOReactorException {
// given
HttpClientFactory.Builder builder = new HttpClientFactory.Builder();
HttpClientFactory factory = spy(builder.build());
String expectedMessage = UUID.randomUUID().toString();
when(factory.createIOReactor()).thenThrow(new IOReactorException(expectedMessage));
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(expectedMessage);
// when
factory.createInstance();
}
private CloseableHttpAsyncClient createHttpAsyncClient(YunpianConf conf) throws IOReactorException {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setConnectTimeout(conf.getConfInt(YunpianConf.HTTP_CONN_TIMEOUT, "10000"))
.setSoTimeout(conf.getConfInt(YunpianConf.HTTP_SO_TIMEOUT, "30000")).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
ConnectionConfig connectionConfig = ConnectionConfig.custom().setMalformedInputAction(CodingErrorAction.IGNORE)
.setUnmappableInputAction(CodingErrorAction.IGNORE)
.setCharset(Charset.forName(conf.getConf(YunpianConf.HTTP_CHARSET, YunpianConf.HTTP_CHARSET_DEFAULT))).build();
connManager.setDefaultConnectionConfig(connectionConfig);
connManager.setMaxTotal(conf.getConfInt(YunpianConf.HTTP_CONN_MAXTOTAL, "100"));
connManager.setDefaultMaxPerRoute(conf.getConfInt(YunpianConf.HTTP_CONN_MAXPERROUTE, "10"));
CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(connManager).build();
httpclient.start();
return httpclient;
}
public FiberApacheHttpClientRequestExecutor(final Validator<CloseableHttpResponse> resValidator, final int maxConnections, final int timeout, final int parallelism) throws IOReactorException {
final DefaultConnectingIOReactor ioreactor = new DefaultConnectingIOReactor(IOReactorConfig.custom().
setConnectTimeout(timeout).
setIoThreadCount(parallelism).
setSoTimeout(timeout).
build());
final PoolingNHttpClientConnectionManager mngr = new PoolingNHttpClientConnectionManager(ioreactor);
mngr.setDefaultMaxPerRoute(maxConnections);
mngr.setMaxTotal(maxConnections);
final CloseableHttpAsyncClient ahc = HttpAsyncClientBuilder.create().
setConnectionManager(mngr).
setDefaultRequestConfig(RequestConfig.custom().setLocalAddress(null).build()).build();
client = new FiberHttpClient(ahc);
validator = resValidator;
}
/**
* Constructs a new BCE Http Client with httpAsyncPutEnabled.
*
* @param config Configuration options specifying how this client will communicate with BCE (ex: proxy settings,
* retry count, etc.).
* @param signer signer used to sign http requests
* @param isHttpAsyncPutEnabled whether use Async for PUT method.
*/
public BceHttpClient(BceClientConfiguration config, Signer signer, boolean isHttpAsyncPutEnabled) {
this(config, signer);
if (isHttpAsyncPutEnabled) {
try {
this.nioConnectionManager = this.createNHttpClientConnectionManager();
this.httpAsyncClient = this.createHttpAsyncClient(this.nioConnectionManager);
this.httpAsyncClient.start();
this.isHttpAsyncPutEnabled = true;
} catch (IOReactorException e) {
this.isHttpAsyncPutEnabled = false;
}
} else {
this.isHttpAsyncPutEnabled = false;
}
}
public void initialize(final NettyRpcProperties properties) throws Exception {
final CommonProperties cp = properties.getCommonProperties();
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CommonProperties.CPUS * 2)
.setConnectTimeout(Integer.parseInt(cp.getHttpConnectTimeout()))
.setSoTimeout(Integer.parseInt(cp.getHttpSocketTimeout()))
.setSndBufSize(Integer.parseInt(cp.getHttpSendBufSize()))
.setRcvBufSize(Integer.parseInt(cp.getHttpRcvBufSize()))
.setBacklogSize(Integer.parseInt(cp.getHttpBackLogSize()))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(cp.getHttpMaxTotal()));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
/***
* 创建httpclient
*
* @param config
* 配置文件
* @return
* @throws IOReactorException
*/
public static HttpClient createHttpClient(OpenTSDBConfig config) throws IOReactorException {
Objects.requireNonNull(config);
ConnectingIOReactor ioReactor = initIOReactorConfig();
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
RequestConfig requestConfig = initRequestConfig(config);
CloseableHttpAsyncClient httpAsyncClient = createPoolingHttpClient(requestConfig, connManager, config);
return new HttpClient(config, httpAsyncClient, initFixedCycleCloseConnection(connManager));
}
public void initialize() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(CPUS * 2)
.setConnectTimeout(Integer.parseInt(HTTPCLIENT_CONNCT_TIMEOUT_DEFAULT))
.setSoTimeout(Integer.parseInt(HTTPCLIENT_SOCKET_TIMEOUT_DEFAULT))
.setSndBufSize(Integer.parseInt(HTTPCLIENT_SEDBUFSIZE_DEFAULT))
.setRcvBufSize(Integer.parseInt(HTTPCLIENT_RCV_BUFSIZE_DEFAULT))
.setBacklogSize(Integer.parseInt(HTTPCLIENT_BACK_LOG_SIZE_DEFAULT))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(Integer.parseInt(HTTPCLIENT_MAX_TOTAL_DEFAULT));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
isStarted=true;
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await();
}
public AsyncInternalClient(Config config){
this.config = config;
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnectCount());
cm.setDefaultMaxPerRoute(config.getMaxPerRoute());
httpClient = createHttpAsyncClient(config, cm);
httpClient.start();
} catch (IOReactorException e) {
throw new ClientException(e);
}
}
protected static ConnectingIOReactor createConnectingIOReactor(int ioThreadCount) {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(ioThreadCount).setTcpNoDelay(true).build();
try {
return new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException ex) {
throw new IOReactorRuntimeException(ex);
}
}
public static CloseableHttpAsyncClient getCloseableHttpAsyncClient() throws IOReactorException {
if (closeableHttpAsyncClient == null) {
synchronized (WxMpTemplateMsgSender.class) {
if (closeableHttpAsyncClient == null) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(-1)
.setSocketTimeout(-1)
.setConnectionRequestTimeout(-1)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true).setConnectTimeout(-1).setSoTimeout(-1)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
//最大连接数
connManager.setMaxTotal(5000);
//per route最大连接数
connManager.setDefaultMaxPerRoute(5000);
closeableHttpAsyncClient = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
closeableHttpAsyncClient.start();
}
}
}
return closeableHttpAsyncClient;
}
PoolingNHttpClientConnectionManager createUnconfiguredPoolingNHttpClientConnectionManager() {
try {
return new PoolingNHttpClientConnectionManager(createIOReactor(), createSchemeIOSessionStrategyRegistry());
} catch (IOReactorException e) {
throw new IllegalStateException(e);
}
}
@Test
public void ioReactorUsesProvidedIOReactorConfig() throws IOReactorException {
// given
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder();
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
// when
factory.createIOReactor();
// then
verify(factory).createIoReactorConfig();
}
@Test
public void unconfiguredNHttpConnectionManagerUsesIOReactor() throws IOReactorException {
// given
WrappedHttpClientConfig.Builder builder = createDefaultTestWrappedHttpClientConfigBuilder();
ExtendedJestClientFactory factory = spy(new ExtendedJestClientFactory(builder.build()));
// when
factory.createUnconfiguredPoolingNHttpClientConnectionManager();
// then
verify(factory).createIOReactor();
}
PoolingNHttpClientConnectionManager createUnconfiguredPoolingNHttpClientConnectionManager() {
try {
return new PoolingNHttpClientConnectionManager(createIOReactor(), createSchemeIOSessionStrategyRegistry());
} catch (IOReactorException e) {
throw new IllegalStateException(e);
}
}
public AsyncServiceClient(ClientConfiguration config) {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(config.getIoThreadCount()).build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
ioReactorConfig);
PoolingNHttpClientConnectionManager cm =
new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(config.getMaxConnections());
cm.setDefaultMaxPerRoute(config.getMaxConnections());
httpClient = HttpFactory.createHttpAsyncClient(config, cm);
/*
* socketTimeout的值限制了closeIdleConnections执行的周期。
* 如果周期相对socketTimeout的值过长,有可能一个请求分配到一个即将socketTimeout的连接,
* 在请求发送之前即抛出SocketTimeoutException。
* 现在让closeIdleConnections的执行周期为socketTimeout / 2.5。
*/
long closePeriod = 5000;
if (config.getSocketTimeoutInMillisecond() > 0) {
closePeriod = (long) (config.getSocketTimeoutInMillisecond() / 2.5);
}
closePeriod = closePeriod < 5000 ? closePeriod : 5000;
connEvictor = new IdleConnectionEvictor(cm, closePeriod);
httpClient.start();
connEvictor.start();
} catch (IOReactorException ex) {
throw new ClientException(String.format("IOReactorError: %s",
ex.getMessage()), ex);
}
}
public static void main(final String[] args) throws SuspendExecution, InterruptedException, ExecutionException, IOReactorException, IOException {
final IntervalGenerator intervalGenerator = new ConstantIntervalGenerator(10000000);
try (final FiberApacheHttpClientRequestExecutor requestExecutor =
new FiberApacheHttpClientRequestExecutor<>((res) -> {
if (res == null) {
throw new AssertionError("Response is null");
}
final int status = res.getStatusLine().getStatusCode();
if (status != 200) {
throw new AssertionError("Status is " + status);
}
}, 1000000)) {
final Channel<HttpGet> requestCh = Channels.newChannel(1000);
final Channel<TimingEvent<CloseableHttpResponse>> eventCh = Channels.newChannel(1000);
// Requests generator
new Fiber<Void>("req-gen", () -> {
// Bench handling 1k reqs
for (int i = 0; i < 1000; ++i) {
requestCh.send(new HttpGet("http://localhost:8080/hello-world"));
}
requestCh.close();
}).start();
final Histogram histogram = new Histogram(3600000000L, 3);
// Event recording, both HistHDR and logging
record(eventCh, new HdrHistogramRecorder(histogram, 1000000), new LoggingRecorder(LOG));
// Main
new Fiber<Void>("jbender", () -> {
JBender.loadTestThroughput(intervalGenerator, 0, requestCh, requestExecutor, eventCh);
}).start().join();
histogram.outputPercentileDistribution(System.out, 1000.0);
}
}
public void initialize(final ThunderProperties properties) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(ThunderConstant.CPUS)
.setConnectTimeout(properties.getInteger(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME))
.setSoTimeout(properties.getInteger(ThunderConstant.APACHE_SO_TIMEOUT_ATTRIBUTE_NAME))
.setSndBufSize(properties.getInteger(ThunderConstant.APACHE_SNDBUF_SIZE_ATTRIBUTE_NAME))
.setRcvBufSize(properties.getInteger(ThunderConstant.APACHE_RCVBUF_SIZE_ATTRIBUTE_NAME))
.setBacklogSize(properties.getInteger(ThunderConstant.APACHE_BACKLOG_SIZE_ATTRIBUTE_NAME))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(ThunderConstant.CPUS * properties.getInteger(ThunderConstant.APACHE_MAX_TOTAL_ATTRIBUTE_NAME));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await(properties.getLong(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME) * 2, TimeUnit.MILLISECONDS);
}
/**
* Create connection manager for asynchronous http client.
*
* @return Connection manager for asynchronous http client.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
protected NHttpClientConnectionManager createNHttpClientConnectionManager() throws IOReactorException {
ConnectingIOReactor ioReactor =
new DefaultConnectingIOReactor(IOReactorConfig.custom()
.setSoTimeout(this.config.getSocketTimeoutInMillis()).setTcpNoDelay(true).build());
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(ioReactor);
connectionManager.setDefaultMaxPerRoute(this.config.getMaxConnections());
connectionManager.setMaxTotal(this.config.getMaxConnections());
return connectionManager;
}
public IOReactorRuntimeException(IOReactorException ex) {
super(ex);
}
ConnectingIOReactor createIOReactor() throws IOReactorException {
return new DefaultConnectingIOReactor(createIoReactorConfig());
}
ConnectingIOReactor createIOReactor() throws IOReactorException {
return new DefaultConnectingIOReactor(createIOReactorConfig());
}
public FiberApacheHttpClientRequestExecutor(final Validator<CloseableHttpResponse> resValidator, final int maxConnections, final int timeout) throws IOReactorException {
this(resValidator, maxConnections, timeout, Runtime.getRuntime().availableProcessors());
}
public FiberApacheHttpClientRequestExecutor(final Validator<CloseableHttpResponse> resValidator, final int maxConnections) throws IOReactorException {
this(resValidator, maxConnections, 0);
}
public FiberApacheHttpClientRequestExecutor(final int maxConnections) throws IOReactorException {
this(null, maxConnections, 0);
}