下面列出了怎么用java.util.concurrent.CyclicBarrier的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
@Before
public void setUp() throws Exception {
super.setUp();
executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
server = createServer(createBasicConfig()
.setPersistenceEnabled(false)
.setAddressesSettings(Collections.singletonMap("#", new AddressSettings()
.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE)
.setPageSizeBytes(50000)
.setMaxSizeBytes(404850)))
.setAcceptorConfigurations(Collections.singleton(new TransportConfiguration(NettyAcceptorFactory.class.getName()))));
server.start();
cf = ActiveMQJMSClient.createConnectionFactory("tcp://127.0.0.1:61616", "cf");
queue = ActiveMQJMSClient.createQueue("simple");
barrierLatch = new CyclicBarrier(PRODUCERS + 1);
runnersLatch = new CountDownLatch(PRODUCERS + 1);
msgReceived = new AtomicLong(0);
msgSent = new AtomicLong(0);
}
@Test
public void subscribeCloseSynchronously() throws Exception {
AtomicReference<Future<?>> futureRef = new AtomicReference<>();
toSource(cpw.connect().afterOnSubscribe(subscription -> {
// We want to increase the chance that the writer thread has to wait for the Subscriber to become
// available, instead of waiting for the requestN demand.
CyclicBarrier barrier = new CyclicBarrier(2);
futureRef.compareAndSet(null, executorService.submit(toRunnable(() -> {
barrier.await();
cpw.close();
})));
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
})).subscribe(subscriber);
Future<?> f = futureRef.get();
assertNotNull(f);
f.get();
assertThat(subscriber.takeTerminal(), is(complete()));
}
public AbstractClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime) {
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
serviceFactory.setTargetIP(targetIP);
serviceFactory.setClientNums(clientNums);
serviceFactory.setTargetPort(targetPort);
serviceFactory.setConnectTimeout(rpcTimeout);
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public BidClientRunnable(String protocol, String serialization, String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime,
long endTime){
super(protocol, serialization, targetIP, targetPort, clientNums, rpcTimeout, barrier, latch, startTime, endTime);
Impression imp = new Impression();
imp.setBidFloor(1.1);
imp.setId("abc");
List<Impression> imps = new ArrayList<Impression>(1);
imps.add(imp);
request.setImpressions(imps);
Geo geo = new Geo();
geo.setCity("beijing");
geo.setCountry("china");
geo.setLat(100.1f);
geo.setLon(100.1f);
Device device = new Device();
device.setMake("apple");
device.setOs("ios");
device.setVersion("7.0");
device.setLang("zh_CN");
device.setModel("iphone");
device.setGeo(geo);
request.setDevice(device);
}
@Test
public void eachThreadGetsDifferentGlobalTxId() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Runnable runnable = exceptionalRunnable(new ExceptionalRunnable() {
@Override
public void run() throws Exception {
String txId = UUID.randomUUID().toString();
omegaContext.setGlobalTxId(txId);
barrier.await();
assertThat(omegaContext.globalTxId(), is(txId));
}
});
Future f1 = executor.submit(runnable); ;
Future f2 = executor.submit(runnable);
f1.get();
f2.get();
}
/**
* A reset of an active barrier causes waiting threads to throw
* BrokenBarrierException
*/
public void testReset_BrokenBarrier() throws InterruptedException {
final CyclicBarrier c = new CyclicBarrier(3);
final CountDownLatch pleaseReset = new CountDownLatch(2);
Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseReset.countDown();
c.await();
}};
Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseReset.countDown();
c.await();
}};
t1.start();
t2.start();
await(pleaseReset);
awaitNumberWaiting(c, 2);
c.reset();
awaitTermination(t1);
awaitTermination(t2);
}
public AbstractClientRunnable(String targetIP, int targetPort, int clientNums, int rpcTimeout,
CyclicBarrier barrier, CountDownLatch latch, long startTime, long endTime){
this.barrier = barrier;
this.latch = latch;
this.startTime = startTime;
this.endTime = endTime;
serviceFactory.setTargetIP(targetIP);
serviceFactory.setClientNums(clientNums);
serviceFactory.setTargetPort(targetPort);
serviceFactory.setConnectTimeout(rpcTimeout);
maxRange = (Integer.parseInt(String.valueOf((endTime - startTime))) / 1000000) + 1;
errorTPS = new long[maxRange];
errorResponseTimes = new long[maxRange];
tps = new long[maxRange];
responseTimes = new long[maxRange];
// init
for (int i = 0; i < maxRange; i++) {
errorTPS[i] = 0;
errorResponseTimes[i] = 0;
tps[i] = 0;
responseTimes[i] = 0;
}
}
public void start(){
long everySize = this.fileLength/this.threadSize;
try {
calculateStartEnd(0, everySize);
} catch (IOException e) {
e.printStackTrace();
return;
}
final long startTime = System.currentTimeMillis();
cyclicBarrier = new CyclicBarrier(startEndPairs.size(),new Runnable() {
@Override
public void run() {
System.out.println("use time: "+(System.currentTimeMillis()-startTime));
System.out.println("all line: "+counter.get());
}
});
for(StartEndPair pair:startEndPairs){
System.out.println("分配分片:"+pair);
this.executorService.execute(new SliceReaderTask(pair));
}
}
/**
* A 2-party/thread barrier triggers after both threads invoke await
*/
public void testTwoParties() throws Exception {
final CyclicBarrier b = new CyclicBarrier(2);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws Exception {
b.await();
b.await();
b.await();
b.await();
}});
b.await();
b.await();
b.await();
b.await();
awaitTermination(t);
}
private DatagramPacket sendCommand(Item item, String itemConfig, Command command) throws Exception {
// Set up sockets for testing
DatagramPacket sentPacket = new DatagramPacket(new byte[1024], 1024);
CyclicBarrier barrier = new CyclicBarrier(2);
doAnswer(waitIndefinitely()).when(mockReceiveSocket).receive(any(DatagramPacket.class));
doAnswer(waitIndefinitely()).when(mockReceiveSocket2).receive(any(DatagramPacket.class));
doAnswer(transmitAnswer(sentPacket, barrier)).when(mockTransmitSocket).send(any(DatagramPacket.class));
// Setup Item config
bindingProvider.processBindingConfiguration(CONTEXT, item, itemConfig);
binding.addBindingProvider(bindingProvider);
// Activate the binding ready for the test
binding.activateForTesting();
// Send the command
binding.internalReceiveCommand(item.getName(), command);
// Wait till the socket has sent the command
barrier.await(1000, TimeUnit.MILLISECONDS);
return sentPacket;
}
static void testSimple() throws Throwable {
Recording recording = new Recording();
// Default period is once per chunk
recording.enable(EventNames.ThreadCPULoad).withPeriod(Duration.ofMillis(eventPeriodMillis));
recording.start();
Duration testRunTime = Duration.ofMillis(eventPeriodMillis * cpuConsumerRunFactor);
CyclicBarrier barrier = new CyclicBarrier(2);
CpuConsumingThread thread = new CpuConsumingThread(testRunTime, barrier);
// Run a single pass
thread.start();
barrier.await();
barrier.await();
recording.stop();
List<RecordedEvent> events = Events.fromRecording(recording);
Events.hasEvents(events);
verifyPerThreadInvariant(events, cpuConsumerThreadName);
thread.interrupt();
thread.join();
}
Thread[] runInThreads(int numThreads, final Runnable runnable) {
Thread threads[] = new Thread[numThreads];
final CyclicBarrier barrier = new CyclicBarrier(numThreads);
for(int i = 0; i < numThreads; i++) {
Thread thread = new Thread() {
@Override
public void run() {
try {
barrier.await();
runnable.run();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
};
threads[i] = thread;
thread.start();
}
return threads;
}
/**
* Execute the provided task in the EventLoopGroup only once there
* are no more running/queued tasks (but might be future scheduled tasks).
* The key thing here is that it will continue to wait if new tasks
* are scheduled by the already running/queued ones.
*/
private void executeWhenIdle(Runnable task) {
AtomicInteger remainingTasks = new AtomicInteger(-1);
// Two "cycles" are performed, the first with remainingTasks == -1.
// If remainingTasks > 0 after the second cycle, this method
// is re-called recursively (in an async context)
CyclicBarrier cb = new CyclicBarrier(internalExecutor.executorCount(), () -> {
int rt = remainingTasks.get();
if (rt == -1) {
remainingTasks.incrementAndGet();
} else if (rt > 0) {
executeWhenIdle(task);
} else {
internalExecutor.execute(task);
}
});
internalExecutor.forEach(ex -> ex.execute(new Runnable() {
@Override public void run() {
SingleThreadEventLoop stel = (SingleThreadEventLoop) ex;
try {
if (stel.pendingTasks() > 0) {
ex.execute(this);
} else {
cb.await();
if (stel.pendingTasks() > 0) {
remainingTasks.incrementAndGet();
}
cb.await();
}
} catch (InterruptedException|BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}));
}
@Test
public void test() throws InterruptedException {
final CountDownLatch countdown = Concurrents.countDownLatch(10);
final CyclicBarrier barrier = Concurrents.cyclicBarrier(10);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ThreadLocalContext.put("myname", Thread.currentThread().getName());
ThreadUtil.sleep(RandomUtil.nextLong(100, 300));
System.out.println((String) ThreadLocalContext.get("myname"));
ThreadLocalContext.reset();
System.out.println(
"shoud null for " + Thread.currentThread().getName() + ":" + ThreadLocalContext.get("myname"));
countdown.countDown();
}
};
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
countdown.await();
}
@Test
public void cancelUnblocksWrite() throws Exception {
CyclicBarrier afterFlushBarrier = new CyclicBarrier(2);
Future<?> f = executorService.submit(toRunnable(() -> {
cbos.write(1);
cbos.flush();
afterFlushBarrier.await();
cbos.write(2);
cbos.flush();
}));
toSource(cbos.connect()).subscribe(subscriber);
subscriber.request(1);
afterFlushBarrier.await();
subscriber.cancel();
try {
f.get();
fail();
} catch (ExecutionException e) {
verifyCheckedRunnableException(e, IOException.class);
}
assertThat(subscriber.takeItems(), contains(buf(1)));
assertThat(subscriber.takeTerminal(), is(complete()));
cbos.close(); // should be idempotent
// Make sure the Subscription thread isn't blocked.
subscriber.request(1);
subscriber.cancel();
}
void init() {
if (numThreads == 0) {
numThreads = Runtime.getRuntime().availableProcessors();
}
if (seed == 0) {
seed = (new Random()).nextLong();
}
rand = new Random(seed);
l = new CyclicBarrier(numThreads + 1);
System.out.printf("Threads: %d\n", numThreads);
System.out.printf("Seed: %d\n", seed);
}
/**
* Tests that we can concurrently create two {@link ActorSystem} without port conflicts.
* This effectively tests that we don't open a socket to check for a ports availability.
* See FLINK-10580 for more details.
*/
@Test
public void testConcurrentActorSystemCreation() throws Exception {
final int concurrentCreations = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(concurrentCreations);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrentCreations);
try {
final List<CompletableFuture<Void>> actorSystemFutures = IntStream.range(0, concurrentCreations)
.mapToObj(
ignored ->
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() -> {
cyclicBarrier.await();
return BootstrapTools.startActorSystem(
new Configuration(),
"localhost",
"0",
LOG);
}), executorService))
.map(
// terminate ActorSystems
actorSystemFuture ->
actorSystemFuture.thenCompose(AkkaUtils::terminateActorSystem)
).collect(Collectors.toList());
FutureUtils.completeAll(actorSystemFutures).get();
} finally {
ExecutorUtils.gracefulShutdown(10000L, TimeUnit.MILLISECONDS, executorService);
}
}
public Indexed(DoomMain<byte[], byte[]> DOOM, int id,
byte[] screen, TextureManager<byte[]> texman,
RenderSegInstruction<byte[]>[] RSI, short[] BLANKCEILINGCLIP,
short[] BLANKFLOORCLIP, short[] ceilingclip, short[] floorclip,
int[] columnofs, long[] xtoviewangle, int[] ylookup,
visplane_t[] visplanes, CyclicBarrier barrier, LightsAndColors<byte[]> colormaps) {
super(DOOM, id, screen, texman, RSI, BLANKCEILINGCLIP,
BLANKFLOORCLIP, ceilingclip, floorclip, columnofs, xtoviewangle,
ylookup, visplanes, barrier, colormaps);
dcvars=new ColVars<>();
colfunc=colfunchi=new R_DrawColumnBoomOpt.Indexed(DOOM.vs.getScreenWidth(), DOOM.vs.getScreenHeight(),ylookup,columnofs,dcvars,screen,null );
colfunclow=new R_DrawColumnBoomOptLow.Indexed(DOOM.vs.getScreenWidth(), DOOM.vs.getScreenHeight(),ylookup,columnofs,dcvars,screen,null );
}
public BaseClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier,
IWorkload workload) {
super(id, countDownLatch, barrier);
syntheticWorkload = workload;
singletonWorkload = SingletonWorkload.getInstance();
operationController = new OperationController(id);
insertLoopIndex = 0;
}
@Test
public void concurrentRequestN() throws InterruptedException {
final int expectedSubscribers = 50;
Publisher<Integer> multicast = source.multicastToExactly(expectedSubscribers, expectedSubscribers);
@SuppressWarnings("unchecked")
TestPublisherSubscriber<Integer>[] subscribers = (TestPublisherSubscriber<Integer>[])
new TestPublisherSubscriber[expectedSubscribers];
final int expectedSubscribersMinus1 = expectedSubscribers - 1;
for (int i = 0; i < expectedSubscribersMinus1; ++i) {
subscribers[i] = new TestPublisherSubscriber<>();
toSource(multicast).subscribe(subscribers[i]);
}
subscribers[expectedSubscribersMinus1] = new TestPublisherSubscriber<>();
toSource(multicast).subscribe(subscribers[expectedSubscribersMinus1]);
for (int i = 0; i < expectedSubscribersMinus1; ++i) {
assertThat(subscribers[i].subscriptionReceived(), is(true));
}
source.onSubscribe(subscription);
ExecutorService executorService = new ThreadPoolExecutor(0, expectedSubscribers, 1, SECONDS,
new SynchronousQueue<>());
try {
CyclicBarrier barrier = new CyclicBarrier(expectedSubscribers);
CountDownLatch doneLatch = new CountDownLatch(expectedSubscribers);
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
for (int i = 1; i <= expectedSubscribers; ++i) {
executorService.execute(requestIRunnable(subscribers, i, barrier, throwableRef, doneLatch));
}
doneLatch.await();
assertThat(throwableRef.get(), is(nullValue()));
assertThat(subscription.requested(), is((long) expectedSubscribers));
assertThat(subscription.isCancelled(), is(false));
} finally {
executorService.shutdown();
}
}
private void testEncodeDecode(final LogEntryEncoder encoder, final LogEntryDecoder decoder,
final CyclicBarrier barrier) throws Exception {
ByteBuffer buf = ByteBuffer.wrap(DATA);
LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
entry.setData(buf);
entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new PeerId("localhost", 100, 2)));
if (barrier != null) {
barrier.await();
}
for (int i = 0; i < TIMES; i++) {
entry.setId(new LogId(i, i));
byte[] content = encoder.encode(entry);
assert (content.length > 0);
this.logSize.addAndGet(content.length);
LogEntry nLog = decoder.decode(content);
assertEquals(2, nLog.getPeers().size());
assertArrayEquals(DATA, nLog.getData().array());
assertEquals(i, nLog.getId().getIndex());
assertEquals(i, nLog.getId().getTerm());
}
if (barrier != null) {
barrier.await();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testConcurrentRegisterDeregister() throws Exception {
final int THREADS = 10;
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
final IgniteCache<Object, Object> cache = jcache(0);
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
new Factory<CacheEntryListener<Object, Object>>() {
@Override public CacheEntryListener<Object, Object> create() {
return new CreateUpdateRemoveExpireListener();
}
},
null,
true,
false
);
barrier.await();
for (int i = 0; i < 100; i++) {
cache.registerCacheEntryListener(cfg);
cache.deregisterCacheEntryListener(cfg);
}
return null;
}
}, THREADS, "register-thread").get();
}
public Indexed(int SCREENWIDTH, int SCREENHEIGHT, int[] columnofs,
int[] ylookup, byte[] screen,
ColVars<byte[], byte[]>[] RWI, CyclicBarrier barrier) {
super(SCREENWIDTH, SCREENHEIGHT, columnofs, ylookup, screen, RWI, barrier);
colfunc =
colfunchi =
new R_DrawColumnBoomOpt.Indexed(SCREENWIDTH, SCREENHEIGHT, ylookup,
columnofs, null, screen, null);
colfunclow =
new R_DrawColumnBoomOptLow.Indexed(SCREENWIDTH, SCREENHEIGHT, ylookup,
columnofs, null, screen, null);
}
public MemoryConflictProvoker(Object monitor) {
super(monitor);
barrier = new CyclicBarrier(2);
conflictingThread = () -> {
try {
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
for (int i = 0; i < MemoryConflictProvoker.INNER_ITERATIONS; i++) {
MemoryConflictProvoker.field++;
}
};
}
@Test
public void closeNoWrite() throws Exception {
CyclicBarrier cb = new CyclicBarrier(2);
Future<?> f = executorService.submit(toRunnable(() -> {
cb.await();
cpw.close();
}));
final Publisher<String> connect = cpw.connect();
cb.await();
toSource(connect).subscribe(subscriber);
subscriber.request(1);
f.get();
assertThat(subscriber.takeItems(), is(empty()));
assertThat(subscriber.takeTerminal(), is(complete()));
}
/**
* Reset of a barrier after interruption reinitializes it.
*/
public void testResetAfterInterrupt() throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(3);
for (int i = 0; i < 2; i++) {
final CyclicBarrier start = new CyclicBarrier(3);
Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
public void realRun() throws Exception {
start.await();
barrier.await();
}};
Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
start.await();
barrier.await();
}};
t1.start();
t2.start();
start.await();
t1.interrupt();
awaitTermination(t1);
awaitTermination(t2);
assertTrue(barrier.isBroken());
assertEquals(0, barrier.getNumberWaiting());
barrier.reset();
assertFalse(barrier.isBroken());
assertEquals(0, barrier.getNumberWaiting());
}
}
public static void main(final String[] args) throws Exception {
// Test for Vector serialization deadlock
final Vector<Object> v1 = new Vector<>();
final Vector<Object> v2 = new Vector<>();
final TestBarrier testStart = new TestBarrier(3);
// Populate the vectors so that they refer to each other
v1.add(testStart);
v1.add(v2);
v2.add(testStart);
v2.add(v1);
final CyclicBarrier testEnd = new CyclicBarrier(3);
final TestThread t1 = new TestThread(v1, testEnd);
final TestThread t2 = new TestThread(v2, testEnd);
t1.start();
t2.start();
// Wait for both test threads to have initiated serialization
// of the 'testStart' object (and hence of both 'v1' and 'v2')
testStart.await();
// Wait for both test threads to successfully finish serialization
// of 'v1' and 'v2'.
System.out.println("Waiting for Vector serialization to complete ...");
System.out.println("(This test will hang if serialization deadlocks)");
testEnd.await();
System.out.println("Test PASSED: serialization completed successfully");
TestThread.handleExceptions();
}
private CheckpointInProgressRequest cancelCountingRequest(AtomicInteger cancelCounter, CyclicBarrier cb) {
return new CheckpointInProgressRequest(
"test",
1L,
unused -> {
},
unused -> {
cancelCounter.incrementAndGet();
await(cb);
},
false
);
}
@Test
public void shutdownNowOnDelegateExecutor() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
final CyclicBarrier latch = new CyclicBarrier(2);
final AtomicInteger numberOfTasks = new AtomicInteger(0);
final CountDownLatch ran = new CountDownLatch(1);
executor.execute(() -> {
try {
latch.await(1, TimeUnit.MINUTES);
numberOfTasks.set(executor.shutdownNow());
ran.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
for (int i = 0; i < 100; i++) {
executor.execute(() -> System.out.println("Dont worry, this will never happen"));
}
latch.await();
ran.await(1, TimeUnit.SECONDS);
Assert.assertEquals(100, numberOfTasks.get());
Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status());
Assert.assertEquals(0, executor.remaining());
} finally {
executorService.shutdown();
}
}
public void initialize(final ThunderProperties properties) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(ThunderConstant.CPUS)
.setConnectTimeout(properties.getInteger(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME))
.setSoTimeout(properties.getInteger(ThunderConstant.APACHE_SO_TIMEOUT_ATTRIBUTE_NAME))
.setSndBufSize(properties.getInteger(ThunderConstant.APACHE_SNDBUF_SIZE_ATTRIBUTE_NAME))
.setRcvBufSize(properties.getInteger(ThunderConstant.APACHE_RCVBUF_SIZE_ATTRIBUTE_NAME))
.setBacklogSize(properties.getInteger(ThunderConstant.APACHE_BACKLOG_SIZE_ATTRIBUTE_NAME))
.setTcpNoDelay(true)
.setSoReuseAddress(true)
.setSoKeepAlive(true)
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager httpManager = new PoolingNHttpClientConnectionManager(ioReactor);
httpManager.setMaxTotal(ThunderConstant.CPUS * properties.getInteger(ThunderConstant.APACHE_MAX_TOTAL_ATTRIBUTE_NAME));
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(httpManager).build();
httpAsyncClient.start();
LOG.info("Create apache async client successfully");
barrier.await();
} catch (IOReactorException e) {
LOG.error("Create apache async client failed", e);
}
return null;
}
});
barrier.await(properties.getLong(ThunderConstant.APACHE_CONNECT_TIMEOUT_ATTRIBUTE_NAME) * 2, TimeUnit.MILLISECONDS);
}