下面列出了怎么用java.util.concurrent.Executors的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Start a separate thread for polling the JVM for heap memory usage.
*/
private void startMemoryPoolPoller() {
if (tenuredMemoryPoolMXBean == null) {
return;
}
final ThreadGroup threadGroup = LogWriterImpl.createThreadGroup("HeapPoller", this.cache.getLoggerI18n());
final ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(threadGroup, r, "GemfireHeapPoller");
thread.setDaemon(true);
return thread;
}
};
this.pollerExecutor = Executors.newScheduledThreadPool(1, threadFactory);
this.pollerExecutor.scheduleAtFixedRate(new HeapPoller(), POLLER_INTERVAL, POLLER_INTERVAL, TimeUnit.MILLISECONDS);
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine("Started GemfireHeapPoller to poll the heap every " + POLLER_INTERVAL + " milliseconds");
}
}
@Test()
public void canHandleMultipleThreads() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(HttpWorkflowStepPlugin.HTTP_METHODS.length);
ArrayList<Future<Boolean>> results = new ArrayList<>();
for(String method : HttpWorkflowStepPlugin.HTTP_METHODS) {
results.add(executor.submit(() -> {
HttpWorkflowStepPlugin threadedPlugin = new HttpWorkflowStepPlugin();
try {
threadedPlugin.executeStep(new PluginStepContextImpl(), this.getOAuthOptions(method));
return true;
} catch(StepException se) {
se.printStackTrace();
return false;
}
}));
}
assertEquals(HttpWorkflowStepPlugin.HTTP_METHODS.length, results.size());
for(Future<Boolean> result : results) {
assertTrue(result.get());
}
}
private void executeTask(List<SiteProperties.SiteCate> cateList, List<SiteProperties.SiteInfo> sites)
{
threadPoolNum = threadPoolNum < cateList.size() ? threadPoolNum : sites.size();
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolNum);
for (SiteProperties.SiteCate cate : cateList)
{
for (SiteProperties.SiteInfo site : cate.getSites()) {
executorService.submit(() -> {
try {
HotProcessor hotProcessor = null;
hotProcessor = (HotProcessor) baseService.getBean(Class.forName(site.getProcessorClassPath()));
List<Info> infoList = hotProcessor.crawlHotList();
infoRepository.removeByTypeId(site.getCode());
infoRepository.saveAll(infoList, site.getCode());
} catch (RuntimeException | ClassNotFoundException e) {
log.error(e.getMessage(), e);
}
});
}
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
ResourcePool pool = new ResourcePool<Integer>(15,
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 10, 11, 12, 13, 14));
Random random = new Random();
for (int i = 0; i < 30; i++) {
executor.execute(() -> {
try {
Object value = pool.get(60);
System.out.println("Value taken: " + value);
Thread.sleep(random.nextInt(5000));
pool.release(value);
System.out.println("Value released " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
@Override
public Object processAnnotation(String name, Object[] args, Callable<Object> statement) throws Exception {
if ("timeout".equals(name) && args != null && args.length > 0) {
long ms = args[0] instanceof Number
? ((Number) args[0]).longValue()
: Long.parseLong(args[0].toString());
Object def = args.length > 1? args[1] : null;
if (ms > 0) {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<?> future = null;
try {
future = executor.submit(statement);
return future.get(ms, TimeUnit.MILLISECONDS);
} catch (TimeoutException xtimeout) {
if (future != null) {
future.cancel(true);
}
} finally {
executor.shutdown();
}
}
return def;
}
return statement.call();
}
@Before
public void before() throws FileNotFoundException, IOException {
createUsers();
createGroups();
options = new HashMap<>();
options.put("reload", "true"); // Used to simplify reproduction of the
// race condition
options.put("org.apache.activemq.jaas.properties.user", USERS_FILE);
options.put("org.apache.activemq.jaas.properties.role", ROLES_FILE);
options.put("baseDir", temp.getRoot().getAbsolutePath());
errors = new ArrayBlockingQueue<>(processorCount());
pool = Executors.newFixedThreadPool(processorCount(), ActiveMQThreadFactory.defaultThreadFactory());
callback = new JaasCallbackHandler(USERNAME, PASSWORD, null);
}
@Test
public void testCancelWait() throws Exception {
List<Runnable> lr = null;
JexlScript e = JEXL.createScript("wait(10)");
Callable<Object> c = e.callable(new TestContext());
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Future<?> future = executor.submit(c);
Object t = 42;
try {
t = future.get(100, TimeUnit.MILLISECONDS);
Assert.fail("should have timed out");
} catch (TimeoutException xtimeout) {
// ok, ignore
future.cancel(true);
}
Assert.assertTrue(future.isCancelled());
Assert.assertEquals(42, t);
} finally {
lr = executor.shutdownNow();
}
Assert.assertTrue(lr.isEmpty());
}
@Test
public void testCompleteMultithreaded() throws Exception {
InboundDataClient client = CompletableFutureInboundDataClient.create();
Future<Void> waitingFuture =
Executors.newSingleThreadExecutor()
.submit(
() -> {
client.awaitCompletion();
return null;
});
try {
waitingFuture.get(50, TimeUnit.MILLISECONDS);
fail();
} catch (TimeoutException expected) {
// This should time out, as the client should never complete without external completion
}
client.complete();
// Blocks forever if the thread does not continue
waitingFuture.get();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
if (rejectSubmission) {
throw new RejectedExecutionException();
}
AnnotatedRunnable runnable = new AnnotatedRunnable(command, initialDelay, period, unit, true);
runnableList.add(runnable);
FakeScheduledFuture<Unit> future =
new FakeScheduledFuture<Unit>(Executors.callable(runnable, null));
runnable.setFuture(future);
outstandingTasks.put(future, future);
return future;
}
public WebImageCache(Context context) {
// Set up in-memory cache store
memoryCache = new ConcurrentHashMap<String, SoftReference<Bitmap>>();
// Set up disk cache store
Context appContext = context.getApplicationContext();
diskCachePath = appContext.getCacheDir().getAbsolutePath() + DISK_CACHE_PATH;
File outFile = new File(diskCachePath);
outFile.mkdirs();
diskCacheEnabled = outFile.exists();
// Set up threadpool for image fetching tasks
writeThread = Executors.newSingleThreadExecutor();
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(threads) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// install new ClassLoader on each advance
classLoader = new CL();
return terminate;
}
};
ExecutorService exe = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
exe.execute(() -> {
while (phaser.arriveAndAwaitAdvance() >= 0) {
Class<?> proxyClass = Proxy.getProxyClass(classLoader, Runnable.class);
if (!Proxy.isProxyClass(proxyClass)) {
racesDetected.incrementAndGet();
}
}
});
}
Thread.sleep(5000L);
terminate = true;
exe.shutdown();
exe.awaitTermination(5L, TimeUnit.SECONDS);
System.out.println(racesDetected.get() + " races detected");
if (racesDetected.get() != 0) {
throw new RuntimeException(racesDetected.get() + " races detected");
}
}
public void schedule() {
if (executor != null) {
throw new IllegalStateException("Registry Cleaner already scheduled");
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::logStatistic, logIntervalMin, logIntervalMin, TimeUnit.MINUTES);
}
@Test
public void testGetSofaTracerDigestReporterAsyncManager() throws Exception {
final AtomicInteger npeCount = new AtomicInteger();
final AtomicInteger successCount = new AtomicInteger();
int testTimes = 1000;
int threadCount = 100;
final CountDownLatch latch = new CountDownLatch(testTimes);
for (int times = 0; times < testTimes; times++) {
Executors.newFixedThreadPool(threadCount).execute(new Runnable() {
@Override
public void run() {
try {
AsyncCommonDigestAppenderManager sofaTracerDigestReporterAsyncManager = SofaTracerDigestReporterAsyncManager
.getSofaTracerDigestReporterAsyncManager();
sofaTracerDigestReporterAsyncManager.append(sofaTracerSpan);
successCount.getAndIncrement();
} catch (NullPointerException e) {
npeCount.getAndIncrement();
} finally {
latch.countDown();
}
}
});
}
latch.await();
Assert.assertEquals(0, npeCount.get());
Assert.assertEquals(testTimes, successCount.get());
}
@BeforeClass
public static void init() throws Exception {
try {
executorService = Executors.newFixedThreadPool(5);
KafkaTestUtil.cleanLogDir();
KafkaTestUtil.setupKafkaBroker();
Thread.sleep(10000);
} catch (Exception e) {
throw new RemoteException("Exception caught when starting server", e);
}
}
@Override
public void initialize() {
api.registerforChangeEvents(this);
ScheduledExecutorService executerPool = Executors.newScheduledThreadPool(1);
executerPool.schedule(new InitializationRunnable(), 3, TimeUnit.SECONDS);
updateStatus(ThingStatus.ONLINE);
}
GaugeCollector(ConfigService configService, Collector collector,
LazyPlatformMBeanServer lazyPlatformMBeanServer,
final @Nullable Instrumentation instrumentation, Clock clock, Ticker ticker) {
this.configService = configService;
this.collector = collector;
this.lazyPlatformMBeanServer = lazyPlatformMBeanServer;
this.clock = clock;
this.ticker = ticker;
startTimeMillis = clock.currentTimeMillis();
collectionExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadFactories.create("Glowroot-Gauge-Collection"));
flushingExecutor = Executors
.newSingleThreadExecutor(ThreadFactories.create("Glowroot-Gauge-Flushing"));
lazyPlatformMBeanServer.addInitListener(new InitListener() {
@Override
public void postInit(MBeanServer mbeanServer) {
try {
if (JavaVersion.isGreaterThanOrEqualToJava9() && instrumentation != null) {
Java9.grantAccess(instrumentation, "org.glowroot.agent.init.GaugeCollector",
"sun.management.ManagementFactoryHelper", true);
}
Class<?> sunManagementFactoryHelperClass =
Class.forName("sun.management.ManagementFactoryHelper");
Method registerInternalMBeansMethod = sunManagementFactoryHelperClass
.getDeclaredMethod("registerInternalMBeans", MBeanServer.class);
registerInternalMBeansMethod.setAccessible(true);
registerInternalMBeansMethod.invoke(null, mbeanServer);
} catch (Throwable t) {
logger.debug(t.getMessage(), t);
}
}
});
flushingExecutor.execute(new GaugeFlushingLoop());
}
/**
* Constructs a server with the specified configuration that listens to the
* specified ports after method {@link #start()} is called.
*
* @param config the configuration, if <code>null</code> the configuration returned by
* {@link NetworkConfig#getStandard()} is used.
* @param ports the ports to bind to
*/
public CoapServer(NetworkConfig config, int... ports) {
// global configuration that is passed down (can be observed for changes)
if (config != null) {
this.config = config;
} else {
this.config = NetworkConfig.getStandard();
}
// resources
this.root = createRoot();
this.deliverer = new ServerMessageDeliverer(root);
CoapResource well_known = new CoapResource(".well-known");
well_known.setVisible(false);
well_known.add(new DiscoveryResource(root));
root.add(well_known);
// endpoints
this.endpoints = new ArrayList<Endpoint>();
// sets the central thread pool for the protocol stage over all endpoints
this.executor = Executors.newScheduledThreadPool( config.getInt(NetworkConfig.Keys.PROTOCOL_STAGE_THREAD_COUNT) );
// create endpoint for each port
for (int port:ports)
addEndpoint(new CoapEndpoint(port, this.config));
}
private ScheduledExecutorService setupRefreshKeysJob(int refreshSeconds) {
// set up periodic timer to update keys from server every refreshSeconds;
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
scheduler.scheduleAtFixedRate(() -> {
try {
fetchKeys();
} catch (Exception e) {
// Log, but don't rethrow the exception to prevent scheduler cancelling the scheduled job.
log.error(e.getMessage(), e);
}
}, refreshSeconds, refreshSeconds, TimeUnit.SECONDS);
return scheduler;
}
MessageRecever(Socket getClientSocket,ClientListListener getClientListListener ,ClientWindowListener getClientWindowListener,ClientManager getClientManager)
{
clientExecutor=Executors.newCachedThreadPool();
clientManager=getClientManager;
clientSocket=getClientSocket;
try
{
input = new ObjectInputStream(getClientSocket.getInputStream());
}
catch (IOException ex)
{}
clientListListener=getClientListListener;
clientWindowListener=getClientWindowListener;
}
private ExecutorService getSaveExecutor(boolean saveSources, boolean saveResources) {
if (root == null) {
throw new JadxRuntimeException("No loaded files");
}
int threadsCount = args.getThreadsCount();
LOG.debug("processing threads count: {}", threadsCount);
LOG.info("processing ...");
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);
File sourcesOutDir;
File resOutDir;
if (args.isExportAsGradleProject()) {
ExportGradleProject export = new ExportGradleProject(root, args.getOutDir());
export.init();
sourcesOutDir = export.getSrcOutDir();
resOutDir = export.getResOutDir();
} else {
sourcesOutDir = args.getOutDirSrc();
resOutDir = args.getOutDirRes();
}
if (saveResources) {
appendResourcesSave(executor, resOutDir);
}
if (saveSources) {
appendSourcesSave(executor, sourcesOutDir);
}
return executor;
}
public Listener(final ServerSocket serverSocket, final RunMiNiFi runner) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@Override
public Thread newThread(final Runnable runnable) {
final Thread t = Executors.defaultThreadFactory().newThread(runnable);
t.setDaemon(true);
t.setName("MiNiFi Bootstrap Command Listener");
return t;
}
});
this.runner = runner;
}
@Test
public void testAddResourceConcurrency() throws Exception {
startEmptyStore();
final String key = "key1";
int count = 5;
ExecutorService exec = Executors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
final String fileName = "foo-" + i + ".jar";
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
start.await();
String result = store.addResource(key, fileName);
System.out.println("fileName: " + fileName + ", result: " + result);
return result;
}
};
futures.add(exec.submit(task));
}
// start them all at the same time
start.countDown();
// check the result; they should all agree with the value
Set<String> results = new HashSet<String>();
for (Future<String> future: futures) {
results.add(future.get());
}
assertSame(1, results.size());
exec.shutdown();
}
@Override
protected AbstractCompactor<?> createCompactor(SortedOplogFactory factory) throws IOException {
return new SizeTieredCompactor(factory,
NonCompactor.createFileset("test", new File(".")),
new FileTracker(),
Executors.newSingleThreadExecutor(),
2, 4);
}
/**
* Test multiple nodes issuing different proposals in parallel
*/
@Test
@Parameters(method = "nValues")
@TestCaseName("{method}[N={0}]")
public void testRecoveryForSinglePropose(final int numNodes) throws InterruptedException {
final ExecutorService executorService = Executors.newFixedThreadPool(numNodes);
final LinkedBlockingDeque<List<Endpoint>> decisions = new LinkedBlockingDeque<>();
final Consumer<List<Endpoint>> onDecide = decisions::add;
final Map<Endpoint, FastPaxos> instances = createNFastPaxosInstances(numNodes, onDecide);
final Map.Entry<Endpoint, FastPaxos> any = instances.entrySet().stream().findAny().get();
final List<Endpoint> proposal = Collections.singletonList(Utils.hostFromString("172.14.12.3:1234"));
executorService.execute(() -> any.getValue().propose(proposal, 50));
waitAndVerifyAgreement(numNodes, 20, 50, decisions);
assertAll(proposal, decisions);
}
public static void main(String[] args) throws Exception {
Server server = new Server(
BaseConfiguration.getCLSIDConnectionInfomation(),
Executors.newSingleThreadScheduledExecutor());
server.connect();
/**
* 其中100单位为毫秒,为每次从OPC获取刷新的间隔时间
*/
AccessBase access = new Async20Access(server, PERIOD, false);
/**
* 只有Item的值有变化的时候才会触发CallBack函数
*/
access.addItem("Random.Real5", new DataCallback() {
private int count;
public void changed(Item item, ItemState itemstate) {
System.out.println("[" + (++count) + "],ItemName:["
+ item.getId() + "],value:" + itemstate.getValue());
}
});
/** 开始监听 */
access.bind();
/** 当前线程休眠时间单位:毫秒 */
Thread.sleep(SLEEP);
/** 监听 结束 */
access.unbind();
server.dispose();
}
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();
// 初始化通信层
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //brokerHousekeepingService 接收Broker连接事件
// 初始化固定线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册接收到请求之后具体的处理
this.registerProcessor();
// 增加定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS); //每隔10s扫描broker,维护当前存活的Broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES); //每隔10s打印KVConfig信息。
return true;
}
public KafkaLocationManager(final ZooKeeperHolder zkFactory, final KafkaSettings kafkaSettings) {
this.zkFactory = zkFactory;
this.kafkaProperties = new Properties();
this.kafkaSettings = kafkaSettings;
this.ipAddressChangeListeners = ConcurrentHashMap.newKeySet();
this.updateBootstrapServers(true);
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
this.scheduledExecutor.scheduleAtFixedRate(() -> updateBootstrapServersSafe(false), 1, 1, TimeUnit.MINUTES);
}
/**
* Creates an executor that will use at most <var>nThreads</var> threads.
* @param nThreads the number of threads, or zero for default count (which is number of core)
*/
public WaitableExecutor(int nThreads) {
if (nThreads < 1) {
nThreads = Runtime.getRuntime().availableProcessors();
}
mExecutorService = Executors.newFixedThreadPool(nThreads);
mCompletionService = new ExecutorCompletionService<T>(mExecutorService);
}
/**
* Http Server
*/
public void startHttpServer() throws IOException {
httpServer = com.sun.net.httpserver.HttpServer.create(new InetSocketAddress(0), 0);
// create HttpServer context
HttpContext ctx = httpServer.createContext("/test/", new MyHandler());
executorService = Executors.newCachedThreadPool();
httpServer.setExecutor(executorService);
httpServer.start();
}
void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
final SocketAddress udpAddress) {
tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
private final HashedWheelTimer timer = new HashedWheelTimer();
private final IdleStateHandler idleStateHandler = new IdleStateHandler(
timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
Executors.newCachedThreadPool()));
udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
tcpChannel = tcpServer.bind(tcpAddress);
udpChannel = udpServer.bind(udpAddress);
allChannels.add(tcpChannel);
allChannels.add(udpChannel);
LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress()
+ ", udp://" + udpChannel.getLocalAddress());
}