下面列出了org.junit.runners.model.InitializationError#java.util.concurrent.ExecutorService 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* A submitted failed privileged exception action reports exception
*/
public void testSubmitFailedPrivilegedExceptionAction() throws Exception {
Runnable r = new CheckedRunnable() {
public void realRun() throws Exception {
ExecutorService e = new DirectExecutorService();
Future future = e.submit(Executors.callable(new PrivilegedExceptionAction() {
public Object run() throws Exception {
throw new IndexOutOfBoundsException();
}}));
try {
future.get();
shouldThrow();
} catch (ExecutionException success) {
assertTrue(success.getCause() instanceof IndexOutOfBoundsException);
}}};
runWithPermissions(r);
}
private static boolean shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
return executor.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
return true;
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
}
return false;
}
/**
* Execute the verifier for the given duration.
*
* <p>
* This provides a simple way to execute the verifier for those applications
* which do not wish to manage threads directly.
*
* @param duration amount of time to execute
* @param unit units used to express the duration
* @return number of database rows successfully verified
*/
public long runFor(final long duration, final TimeUnit unit) {
final long deadline = System.currentTimeMillis() + unit.toMillis(duration);
final ExecutorService es = Executors.newSingleThreadExecutor();
final Future<Long> future = es.submit(this);
try {
while (System.currentTimeMillis() < deadline && !future.isDone()) {
Thread.sleep(unit.toMillis(1));
}
} catch (final InterruptedException ignored) {
} finally {
stop();
}
final long result;
try {
result = future.get();
} catch (final InterruptedException | ExecutionException ex) {
throw new IllegalStateException(ex);
} finally {
es.shutdown();
}
return result;
}
@Test(timeout = 1000L)
public void get_memoizes() throws InterruptedException {
AtomicInteger provisionCount = new AtomicInteger();
StorageConsumer storageConsumer = new StorageConsumer() {
@Override protected StorageComponent tryCompute() {
provisionCount.incrementAndGet();
return new InMemoryStorage();
}
};
int getCount = 1000;
CountDownLatch latch = new CountDownLatch(getCount);
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < getCount; i++) {
exec.execute(() -> {
storageConsumer.get();
latch.countDown();
});
}
latch.await();
exec.shutdown();
exec.awaitTermination(1, TimeUnit.SECONDS);
assertThat(provisionCount.get()).isEqualTo(1);
}
private static ExecutorService defaultCleanupExecutor(String metricsGroup, LifeCycleRegistry lifeCycle, MetricRegistry metricRegistry) {
final Meter meter = metricRegistry.meter(MetricRegistry.name(metricsGroup, "AstyanaxEventReaderDAO", "discarded_slab_cleanup"));
String nameFormat = "Events Slab Reader Cleanup-" + metricsGroup.substring(metricsGroup.lastIndexOf('.') + 1) + "-%d";
ExecutorService executor = new ThreadPoolExecutor(
NUM_CLEANUP_THREADS, NUM_CLEANUP_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(MAX_CLEANUP_QUEUE_LENGTH),
new ThreadFactoryBuilder().setNameFormat(nameFormat).build(),
new ThreadPoolExecutor.DiscardPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
meter.mark();
}
});
lifeCycle.manage(new ExecutorServiceManager(executor, Duration.seconds(5), nameFormat));
return executor;
}
/**
* invokeAny(c) throws ExecutionException if no task completes
*/
public void testInvokeAny4() throws Exception {
final ExecutorService e =
new ThreadPoolExecutor(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(e)) {
List<Callable<String>> l = new ArrayList<Callable<String>>();
l.add(new NPETask());
try {
e.invokeAny(l);
shouldThrow();
} catch (ExecutionException success) {
assertTrue(success.getCause() instanceof NullPointerException);
}
}
}
public static void fetchOrderSummaryExecutor()
throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> cfOrderSummary = CompletableFuture.supplyAsync(() -> {
try {
logger.info(() -> "Fetch order summary by: " + Thread.currentThread().getName());
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Order Summary #91022";
}, executor);
String summary = cfOrderSummary.get(); // wait for summary to be available, this is blocking
logger.info(() -> "Order summary: " + summary + "\n");
executor.shutdownNow();
}
@Test
public void failsFastOnAnyFutureFailure() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
IllegalStateException expectedException = new IllegalStateException("19! Aaargh!");
CompletableFuture<List<Integer>> integers = IntStream.range(0, 1000)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (i == 19) {
throw expectedException;
}
return i;
}, threadPool))
.collect(CompletableFutures.toFutureList());
AtomicReference<Throwable> exc = new AtomicReference<>();
integers.handle((success, failure) -> { exc.set(failure.getCause()); return null; }).get();
assertThat(exc.get(), equalTo(expectedException));
}
@Test
@IfProfileValue(name="cas.jpa.concurrent", value="true")
public void verifyConcurrentServiceTicketGeneration() throws Exception {
final TicketGrantingTicket newTgt = newTGT();
addTicketInTransaction(newTgt);
final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_SIZE);
try {
final List<ServiceTicketGenerator> generators = new ArrayList<>(CONCURRENT_SIZE);
for (int i = 0; i < CONCURRENT_SIZE; i++) {
generators.add(new ServiceTicketGenerator(newTgt.getId(), this.jpaTicketRegistry, this.txManager));
}
final List<Future<String>> results = executor.invokeAll(generators);
for (final Future<String> result : results) {
assertNotNull(result.get());
}
} catch (final Exception e) {
logger.debug("testConcurrentServiceTicketGeneration produced an error", e);
fail("testConcurrentServiceTicketGeneration failed.");
} finally {
executor.shutdownNow();
}
}
/**
* Get the pool ready to use.
*/
public synchronized ExecutorService getPool ()
{
if (!creationAllowed) {
logger.info("No longer allowed to create pool: {}", getName());
throw new ProcessingCancellationException("Executor closed");
}
if (!isActive()) {
logger.debug("Creating pool: {}", getName());
pool = createPool();
}
return pool;
}
/**
* Creates the executor service used to run map tasks.
*
* @return an ExecutorService instance that handles map tasks
*/
protected synchronized ExecutorService createMapExecutor() {
// Determine the size of the thread pool to use
int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
if (maxMapThreads < 1) {
throw new IllegalArgumentException(
"Configured " + LOCAL_MAX_MAPS + " must be >= 1");
}
maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
LOG.debug("Starting mapper thread pool executor.");
LOG.debug("Max local threads: " + maxMapThreads);
LOG.debug("Map tasks to process: " + this.numMapTasks);
// Create a new executor service to drain the work queue.
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
return executor;
}
/**
* Execute multiple tasks concurrently.
*
* @param numThreads The number of available threads for concurrent execution.
* @param tasks The tasks to execute.
* @param staggerTasks If true, add delay between task submissions.
* @param <T> The return type of the task.
* @return A list of results for each task executed.
* @throws InterruptedException if task execution is interrupted.
*/
private <T> List<Future<T>> executeConcurrentTasks(int numThreads, List<Callable<T>> tasks, boolean staggerTasks) throws InterruptedException
{
final ExecutorService service = Executors.newFixedThreadPool(numThreads);
List<Future<T>> futures = new ArrayList<>();
if (staggerTasks) {
// Necessary to preserve order of requests for certain tests.
for (Callable<T> task : tasks) {
futures.add(service.submit(task));
Thread.sleep(100);
}
}
else {
futures = service.invokeAll(tasks);
}
return futures;
}
private void writeToFiles(ExecutorService executor, Map<Pair<RelatedFinder, Object>, List<MithraDataObject>> extract)
{
UnifiedMap<File, UnifiedMap<RelatedFinder, List<MithraDataObject>>> dataByFile = UnifiedMap.newMap();
for (Pair<RelatedFinder, Object> key : extract.keySet())
{
File outputFile = this.outputStrategy.getOutputFile(key.getOne(), key.getTwo());
if (outputFile != null)
{
dataByFile.getIfAbsentPut(outputFile, UnifiedMap.<RelatedFinder, List<MithraDataObject>>newMap()).put(key.getOne(), extract.get(key));
}
}
for (File file : dataByFile.keySet())
{
executor.submit(new FileWriterTask(file, dataByFile.get(file)));
}
}
/**
* accumulates by multiple threads produce correct result
*/
public void testAccumulateAndGetMT() {
final int incs = 1000000;
final int nthreads = 4;
final ExecutorService pool = Executors.newCachedThreadPool();
DoubleAccumulator a = new DoubleAccumulator(Double::max, 0.0);
Phaser phaser = new Phaser(nthreads + 1);
for (int i = 0; i < nthreads; ++i)
pool.execute(new AccTask(a, phaser, incs));
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
double expected = incs - 1;
double result = a.get();
assertEquals(expected, result);
pool.shutdown();
}
public ThriftConnection(Configuration conf, ExecutorService pool, final User user)
throws IOException {
this.conf = conf;
this.user = user;
this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
this.port = conf.getInt(Constants.HBASE_THRIFT_SERVER_PORT, -1);
Preconditions.checkArgument(port > 0);
Preconditions.checkArgument(host != null);
this.isFramed = conf.getBoolean(Constants.FRAMED_CONF_KEY, Constants.FRAMED_CONF_DEFAULT);
this.isCompact = conf.getBoolean(Constants.COMPACT_CONF_KEY, Constants.COMPACT_CONF_DEFAULT);
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.connectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
String className = conf.get(Constants.HBASE_THRIFT_CLIENT_BUIDLER_CLASS,
DefaultThriftClientBuilder.class.getName());
try {
Class<?> clazz = Class.forName(className);
Constructor<?> constructor = clazz
.getDeclaredConstructor(ThriftConnection.class);
constructor.setAccessible(true);
clientBuilder = (ThriftClientBuilder) constructor.newInstance(this);
}catch (Exception e) {
throw new IOException(e);
}
}
protected void setAsyncMessageObserverIfNeeded(Exchange exchange) {
if (!exchange.isSynchronous()) {
ExecutorService executor = (ExecutorService)cfg.getRequestContext().get(EXECUTOR_SERVICE_PROPERTY);
if (executor != null) {
exchange.put(Executor.class, executor);
final ClientMessageObserver observer = new ClientMessageObserver(cfg);
exchange.put(MessageObserver.class, message -> {
if (!message.getExchange().containsKey(Executor.class.getName() + ".USING_SPECIFIED")) {
executor.execute(() -> {
observer.onMessage(message);
});
} else {
observer.onMessage(message);
}
});
}
}
}
@Ignore("See AMQ-4286")
@Test(timeout = 60 * 1000)
public void testLotsOfConcurrentConnections() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final ConnectionFactory factory = createConnectionFactory();
int connectionCount = 400;
final AtomicInteger threadId = new AtomicInteger(0);
for (int i = 0; i < connectionCount; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
final int innerId = threadId.incrementAndGet();
try {
ExceptionListener listener = new NioQueueSubscriptionTestListener(innerId, exceptions, LOG);
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setExceptionListener(listener);
connection.start();
assertNotNull(connection.getBrokerName());
connections.add(connection);
} catch (Exception e) {
LOG.error(">>>> Exception in run() on thread " + innerId, e);
exceptions.put(Thread.currentThread(), e);
}
}
});
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
if (!exceptions.isEmpty()) {
LOG.error(">>>> " + exceptions.size() + " exceptions like", exceptions.values().iterator().next());
fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next());
}
LOG.info("created " + connectionCount + " connections");
}
public static void main (String[] args) throws Exception {
Handler handler = new Handler();
InetSocketAddress addr = new InetSocketAddress (0);
HttpServer server = HttpServer.create(addr, 0);
HttpContext ctx = server.createContext("/test", handler);
BasicAuthenticator a = new BasicAuthenticator("[email protected]") {
@Override
public boolean checkCredentials (String username, String pw) {
return "fred".equals(username) && pw.charAt(0) == 'x';
}
};
ctx.setAuthenticator(a);
ExecutorService executor = Executors.newCachedThreadPool();
server.setExecutor(executor);
server.start ();
java.net.Authenticator.setDefault(new MyAuthenticator());
System.out.print("Deadlock: " );
for (int i=0; i<2; i++) {
Runner t = new Runner(server, i);
t.start();
t.join();
}
server.stop(2);
executor.shutdown();
if (error) {
throw new RuntimeException("test failed error");
}
if (count != 2) {
throw new RuntimeException("test failed count = " + count);
}
System.out.println("OK");
}
private static FixClient createFixClient(final InetSocketAddress socketAddress, final SystemConfig systemConfig)
{
final PublishingConnectionObserver publishingTransportObserver = new PublishingConnectionObserver();
final ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("InboundConnection", true, UNCAUGHT_EXCEPTION_HANDLER));
final AsyncTcpSocketFactory asyncTcpSocketFactory = new AsyncTcpSocketFactory(executorService);
final TcpTransport transport = new TcpTransport(publishingTransportObserver, socketAddress, asyncTcpSocketFactory, systemConfig);
publishingTransportObserver.addObserver(transport);
return buildFixClient(transport, publishingTransportObserver, MAX_MESSAGE_SIZE);
}
/**
* A new newFixedThreadPool can execute runnables
*/
public void testNewFixedThreadPool1() {
final ExecutorService e = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(e)) {
e.execute(new NoOpRunnable());
e.execute(new NoOpRunnable());
e.execute(new NoOpRunnable());
}
}
private static synchronized ExecutorService getExecutorService() {
if (executorInstance == null) {
executorInstance = Executors.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
}
return executorInstance;
}
private static void testLockSync() {
count = 0;
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, NUM_INCREMENTS)
.forEach(i -> executor.submit(ProgMainExecServiceLock07::incrementSync));
stop(executor);
System.out.println(count);
}
/**
* Closes the specified {@link ExecutorService}.
* @param executorService The executorService.
* @return True if shutdown is complete.
*/
public static boolean shutdownExecutorService(ExecutorService executorService) {
if (executorService == null) {
return false;
}
//If it hasn't already shutdown, do shutdown.
if (!executorService.isShutdown()) {
executorService.shutdown();
}
try {
//Wait for clean termination
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}
//If not already terminated (via shutdownNow) do shutdownNow.
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}
if (LOGGER.isDebugEnabled()) {
List<Runnable> tasks = executorService.shutdownNow();
if (!tasks.isEmpty()) {
LOGGER.debug("ExecutorService was not cleanly shutdown, after waiting for 10 seconds. Number of remaining tasks:" + tasks.size());
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
//Preserve interrupted status
Thread.currentThread().interrupt();
}
return false;
}
@Override
public void setup() {
ExecutorService pool = Executors.newFixedThreadPool(1);
BlockingQueue<State> q = new ArrayBlockingQueue<>(2000);
input = q;
pool.submit(new Producer(q));
speedDistribution = new AVLTreeDigest(300);
noise = new Random();
speed = new Stripchart(10, 430, 460, 80, 1, 0, 0, 90);
rpm = new Stripchart(10, 520, 460, 80, 1, 0, 0, 2200);
throttle = new Stripchart(10, 610, 460, 80, 1, 0, 0, 100);
frameRate(15);
}
/**
* completed submit of callable returns result
*/
public void testSubmitCallable() throws Exception {
final ExecutorService e =
new CustomTPE(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(e)) {
Future<String> future = e.submit(new StringTask());
String result = future.get();
assertSame(TEST_STRING, result);
}
}
/**
* If timed poll returns non-null, the returned task is completed
*/
public void testPoll2() throws InterruptedException {
final ExecutorService e = Executors.newCachedThreadPool();
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
try (PoolCleaner cleaner = cleaner(e)) {
assertNull(ecs.poll());
Callable c = new StringTask();
ecs.submit(c);
Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
if (f != null)
assertTrue(f.isDone());
}
}
@Test
@Ignore
public void testMRRunningJobManagerRecoverYarnAppWithLock() throws Exception {
Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null);
curator.setData().forPath(SHARE_RESOURCES, generateZkSetData());
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
Callable<Void> task = () -> {
try {
MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig);
for (int j = 0; j < REPETITIONS; ++j) {
if(j % 3 == 0) {
mrRunningJobManager.delete("yarnAppId", "jobId");
} else {
mrRunningJobManager.recoverYarnApp("yarnAppId");
}
}
} catch (Exception e) {
// log or do something
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
verify(log, never()).error(anyString(), any(Throwable.class));
}
@Override
public void transactionMarker() throws Exception {
ExecutorService executor = createExecutorService();
List<Callable<Void>> callables = Lists.newArrayList();
callables.add(new Callable<Void>() {
@Override
public Void call() {
new CreateTraceEntry().traceEntryMarker();
return null;
}
});
callables.add(new Callable<Void>() {
@Override
public Void call() {
new CreateTraceEntry().traceEntryMarker();
return null;
}
});
callables.add(new Callable<Void>() {
@Override
public Void call() {
new CreateTraceEntry().traceEntryMarker();
return null;
}
});
executor.invokeAny(callables);
}
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
protected YourFuture actuallyAsync(ConcurrentAsyncCall callback, ExecutorService service, String title) {
if (isDestructiveAsyncToNormalSync()) { // destructive (for e.g. UnitTest)
return destructiveNormalSync(callback);
} else { // basically here
final String keyword = title + buildExecutorHashExp(service);
final Callable<WaitingAsyncResult> task = createCallableTask(callback, keyword);
final Future<WaitingAsyncResult> nativeFuture = service.submit(task); // real asynchronous
return new BasicYourFuture(nativeFuture);
}
}