类java.util.concurrent.ExecutorCompletionService源码实例Demo

下面列出了怎么用java.util.concurrent.ExecutorCompletionService的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop-ozone   文件: RunningDatanodeState.java
/**
 * Executes one or more tasks that is needed by this state.
 *
 * @param executor -  ExecutorService
 */
@Override
public void execute(ExecutorService executor) {
  ecs = new ExecutorCompletionService<>(executor);
  for (EndpointStateMachine endpoint : connectionManager.getValues()) {
    Callable<EndPointStates> endpointTask = getEndPointTask(endpoint);
    if (endpointTask != null) {
      ecs.submit(endpointTask);
    } else {
      // This can happen if a task is taking more time than the timeOut
      // specified for the task in await, and when it is completed the task
      // has set the state to Shutdown, we may see the state as shutdown
      // here. So, we need to Shutdown DatanodeStateMachine.
      LOG.error("State is Shutdown in RunningDatanodeState");
      context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
    }
  }
}
 
private List<Pair<String, Throwable>> execAndGetResults(List<QueryCallable> tasks)
        throws InterruptedException, java.util.concurrent.ExecutionException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(9//
            , 9 //
            , 1 //
            , TimeUnit.DAYS //
            , new LinkedBlockingQueue<Runnable>(100));
    CompletionService<Pair<String, Throwable>> service = new ExecutorCompletionService<>(executor);
    for (QueryCallable task : tasks) {
        service.submit(task);
    }

    List<Pair<String, Throwable>> results = new ArrayList<>();
    for (int i = 0; i < tasks.size(); i++) {
        Pair<String, Throwable> r = service.take().get();
        failFastIfNeeded(r);
        results.add(r);
    }
    executor.shutdown();
    return results;
}
 
源代码3 项目: tds   文件: PoundTdsWmsTest.java
private int executeRequests(String baseUrl, String[] timeSeriesStrings, int timesToRepeatTimeSeriesRequests,
    HttpClient httpClient, ExecutorCompletionService<MakeHttpRequestResult> completionService,
    List<Future<MakeHttpRequestResult>> futures) {
  Future<MakeHttpRequestResult> curFuture;
  String curUrl;
  int numRequests = 0;
  for (int j = 0; j < timesToRepeatTimeSeriesRequests; j++) {
    for (int i = 0; i < timeSeriesStrings.length; i++) {
      curUrl = baseUrl + timeSeriesStrings[i];
      curFuture =
          completionService.submit(new MakeHttpRequestCallable(httpClient, curUrl, i + j * timeSeriesStrings.length));
      numRequests++;
      futures.add(curFuture);
    }
  }
  return numRequests;
}
 
源代码4 项目: hbase   文件: TestIdLock.java
@Test
public void testMultipleClients() throws Exception {
  ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
  try {
    ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
    for (int i = 0; i < NUM_THREADS; ++i)
      ecs.submit(new IdLockTestThread("client_" + i));
    for (int i = 0; i < NUM_THREADS; ++i) {
      Future<Boolean> result = ecs.take();
      assertTrue(result.get());
    }
    idLock.assertMapEmpty();
  } finally {
    exec.shutdown();
    exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
  }
}
 
源代码5 项目: buck   文件: BuckStressRunner.java
/**
 * Runs {@code parallelism} instances of the {@code runners} in parallel. If one fails, no more
 * are scheduled, though inflight runs are not killed. The stdout for each command is written to a
 * file, and stderr is written to {@link System.err}.
 *
 * @param runners The commands to run
 * @param outputDirectory The directory to write stdout files to
 * @param parallelism The number of runs to do at a time
 * @return The paths to each of the output files
 * @throws StressorException If any of the processes fail or their output cannot be written to
 *     temporary files
 */
public List<Path> run(List<BuckRunner> runners, Path outputDirectory, int parallelism)
    throws StressorException {
  ExecutorService service = Executors.newFixedThreadPool(parallelism);
  ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
  List<Path> filePaths =
      IntStream.range(0, runners.size())
          .mapToObj(i -> outputDirectory.resolve(Integer.toString(i) + ".log"))
          .collect(Collectors.toList());

  List<Future<Integer>> futures = queueRuns(completionService, filePaths, runners);
  String errorMessages = waitForFutures(completionService, futures);
  if (!errorMessages.isEmpty()) {
    throw new StressorException(errorMessages);
  }
  return filePaths;
}
 
源代码6 项目: j2objc   文件: ExecutorCompletionServiceTest.java
/**
 * If poll returns non-null, the returned task is completed
 */
public void testPoll1() throws Exception {
    final ExecutorService e = Executors.newCachedThreadPool();
    final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
    try (PoolCleaner cleaner = cleaner(e)) {
        assertNull(ecs.poll());
        Callable c = new StringTask();
        ecs.submit(c);

        long startTime = System.nanoTime();
        Future f;
        while ((f = ecs.poll()) == null) {
            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
                fail("timed out");
            Thread.yield();
        }
        assertTrue(f.isDone());
        assertSame(TEST_STRING, f.get());
    }
}
 
public GraphBasedSaga(EventStore eventStore,
    Executor executor,
    Map<String, SagaTask> tasks,
    SagaContext sagaContext,
    SingleLeafDirectedAcyclicGraph<SagaRequest> sagaTaskGraph) {

  this.eventStore = eventStore;
  this.tasks = tasks;

  this.transactionTaskRunner = new TaskRunner(
      traveller(sagaTaskGraph, new FromRootTraversalDirection<SagaRequest>()),
      new TransactionTaskConsumer(
          tasks,
          sagaContext,
          new ExecutorCompletionService<Operation>(executor)));

  this.sagaContext = sagaContext;
  this.compensationTaskRunner = new TaskRunner(
      traveller(sagaTaskGraph, new FromLeafTraversalDirection<SagaRequest>()),
      new CompensationTaskConsumer(tasks, sagaContext));

  currentTaskRunner = transactionTaskRunner;
}
 
源代码8 项目: swage   文件: StateCaptureTest.java
@Test
public void testCompletionServiceRunnableCaptures() throws InterruptedException, Exception {
    // Setup
    ExecutorService executor = Executors.newCachedThreadPool();
    CompletionService<Object> delegate = new ExecutorCompletionService<>(executor);
    CompletionService<Object> cs = StateCapture.capturingDecorator(delegate);

    CapturedState mockCapturedState = mock(CapturedState.class);
    Runnable mockRunnable = mock(Runnable.class);
    ThreadLocalStateCaptor.THREAD_LOCAL.set(mockCapturedState);
    Object result = new Object();
    Future<Object> futureResult = cs.submit(mockRunnable, result);
    assertThat("Expected the delegate response to return",
        result, sameInstance(futureResult.get()));
    executor.shutdown();

    verifyStandardCaptures(mockCapturedState, mockRunnable);
}
 
源代码9 项目: articles   文件: PreJava8Test.java
@Test
void example_parallel_completion_order() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletionService<Integer> completionService =
      new ExecutorCompletionService<>(executor);

    for (Integer integer : integers) {
        completionService.submit(() -> Utils.process(integer));
    }

    List<Integer> results = new ArrayList<>();

    for (int i = 0; i < integers.size(); i++) {
        results.add(completionService.take().get());
    }

    assertThat(results)
      .containsExactlyInAnyOrderElementsOf(integers);
}
 
源代码10 项目: cyberduck   文件: ConcurrentTransferWorker.java
public ConcurrentTransferWorker(final SessionPool source,
                                final SessionPool destination,
                                final Transfer transfer,
                                final ThreadPool.Priority priority,
                                final TransferOptions options,
                                final TransferSpeedometer meter,
                                final TransferPrompt prompt,
                                final TransferErrorCallback error,
                                final ConnectionCallback connect,
                                final ProgressListener progressListener,
                                final StreamListener streamListener,
                                final NotificationService notification) {
    super(transfer, options, prompt, meter, error, progressListener, streamListener, connect, notification);
    this.source = source;
    this.destination = destination;
    this.pool = ThreadPoolFactory.get(String.format("%s-transfer", new AlphanumericRandomStringService().random()),
        transfer.getTransferType() == Host.TransferType.newconnection ? 1 : PreferencesFactory.get().getInteger("queue.connections.limit"), priority);
    this.completion = new ExecutorCompletionService<TransferStatus>(pool.executor());
}
 
源代码11 项目: phoenix   文件: UpsertSelectOverlappingBatchesIT.java
@Test
public void testUpsertSelectSameBatchConcurrently() throws Exception {
	try (Connection conn = driver.connect(url, props)) {
	        int numUpsertSelectRunners = 5;
	        ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
	        CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
	        List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
	        // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
	        futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
	        // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
	        for (int i = 0; i < 100; i += 25) {
	            futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
	        }
	        int received = 0;
	        while (received < futures.size()) {
	            Future<Boolean> resultFuture = completionService.take();
	            Boolean result = resultFuture.get();
	            received++;
	            assertTrue(result);
	        }
	        exec.shutdownNow();
	}
}
 
源代码12 项目: quarks   文件: TStreamTest.java
private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException {
    int remainingTasks = numtasks;
    while (remainingTasks > 0) {
        try {
            Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS);
            if (completed == null) {
                System.err.println("Completer timed out");
                throw new RuntimeException(new TimeoutException());
            }
            else {
                completed.get();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        remainingTasks--;
    }
}
 
/**
 * poll returns non-null when the returned task is completed
 */
public void testPoll1()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    assertNull(cs.poll());
    cs.submit(new StringTask());

    long startTime = System.nanoTime();
    Future f;
    while ((f = cs.poll()) == null) {
        if (millisElapsedSince(startTime) > LONG_DELAY_MS)
            fail("timed out");
        Thread.yield();
    }
    assertTrue(f.isDone());
    assertSame(TEST_STRING, f.get());
}
 
/**
 * timed poll returns non-null when the returned task is completed
 */
public void testPoll2()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    assertNull(cs.poll());
    cs.submit(new StringTask());

    long startTime = System.nanoTime();
    Future f;
    while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
        if (millisElapsedSince(startTime) > LONG_DELAY_MS)
            fail("timed out");
        Thread.yield();
    }
    assertTrue(f.isDone());
    assertSame(TEST_STRING, f.get());
}
 
void solveAny(Executor e,
              Collection<Callable<Integer>> solvers)
    throws InterruptedException {
    CompletionService<Integer> cs
        = new ExecutorCompletionService<>(e);
    int n = solvers.size();
    List<Future<Integer>> futures = new ArrayList<>(n);
    Integer result = null;
    try {
        solvers.forEach(solver -> futures.add(cs.submit(solver)));
        for (int i = n; i > 0; i--) {
            try {
                Integer r = cs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {}
        }
    } finally {
        futures.forEach(future -> future.cancel(true));
    }

    if (result != null)
        use(result);
}
 
源代码16 项目: reflection-util   文件: ImmutableProxyTest.java
@Test
@Timeout(TEST_TIMEOUT_SECONDS)
void testConcurrentlyCreateProxy() throws Exception {
	ExecutorService executorService = Executors.newFixedThreadPool(5);
	try {
		CompletionService<TestEntity> completionService = new ExecutorCompletionService<>(executorService);
		for (int x = 0; x < 50; x++) {
			TestEntity entity = new TestEntity(100 + x);
			int numRepetitions = 20;
			for (int i = 0; i < numRepetitions; i++) {
				completionService.submit(() -> ImmutableProxy.create(entity));
			}
			for (int i = 0; i < numRepetitions; i++) {
				TestEntity immutableProxy = completionService.take().get();
				assertThat(immutableProxy).isNotSameAs(entity);
				assertThat(immutableProxy.getNumber()).isEqualTo(entity.getNumber());
			}
			ImmutableProxy.clearCache();
		}
	} finally {
		executorService.shutdown();
		executorService.awaitTermination(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
	}
}
 
源代码17 项目: presto   文件: GlueHiveMetastore.java
private List<Partition> getPartitions(Table table, String expression)
{
    if (partitionSegments == 1) {
        return getPartitions(table, expression, null);
    }

    // Do parallel partition fetch.
    CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
    for (int i = 0; i < partitionSegments; i++) {
        Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
        completionService.submit(() -> getPartitions(table, expression, segment));
    }

    List<Partition> partitions = new ArrayList<>();
    try {
        for (int i = 0; i < partitionSegments; i++) {
            Future<List<Partition>> futurePartitions = completionService.take();
            partitions.addAll(futurePartitions.get());
        }
    }
    catch (ExecutionException | InterruptedException e) {
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
    }

    partitions.sort(PARTITION_COMPARATOR);
    return partitions;
}
 
源代码18 项目: stepchain   文件: StepAsyncExecutor.java
@Override
public Boolean execute(I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
	Boolean results = true;

	if (!processors.isEmpty()) {
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.newFixedThreadPool(parallelCount);
		CompletionService<Boolean> cService = new ExecutorCompletionService<>(threadPoolExecutor);
		results = executeByServiceThreadPool(cService, context, processors);
		threadPoolExecutor.shutdown();
		threadPoolExecutor.awaitTermination(11, TimeUnit.HOURS);
	}
	return results;
}
 
public ParallelScanner(final Executor executor, final int segments, final DynamoDbDelegate dynamoDbDelegate) {
    this.dynamoDbDelegate = dynamoDbDelegate;
    this.exec = new ExecutorCompletionService<>(executor);
    this.finished = new BitSet(segments);
    this.finished.clear();
    this.workers = new ScanSegmentWorker[segments];
    this.currentFutures = new Future[segments];
}
 
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
    Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
    Map<String, ByteBuffer> serviceConsumerMetadata,
    Multimap<String, String> startedInputsMap) throws IOException {
  // TODO Remove jobToken from here post TEZ-421
  super(taskSpec, tezConf, tezUmbilical);
  LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
      + taskSpec);
  int numInputs = taskSpec.getInputs().size();
  int numOutputs = taskSpec.getOutputs().size();
  this.localDirs = localDirs;
  this.inputSpecs = taskSpec.getInputs();
  this.inputsMap = new ConcurrentHashMap<String, LogicalInput>(numInputs);
  this.inputContextMap = new ConcurrentHashMap<String, TezInputContext>(numInputs);
  this.outputSpecs = taskSpec.getOutputs();
  this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
  this.outputContextMap = new ConcurrentHashMap<String, TezOutputContext>(numOutputs);

  this.runInputMap = new LinkedHashMap<String, LogicalInput>();
  this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();

  this.processorDescriptor = taskSpec.getProcessorDescriptor();
  this.processor = createProcessor(processorDescriptor);
  this.serviceConsumerMetadata = serviceConsumerMetadata;
  this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
  this.state = State.NEW;
  this.appAttemptNumber = appAttemptNumber;
  int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread.
  numInitializers = (numInitializers == 0 ? 1 : numInitializers); 
  this.initializerExecutor = Executors.newFixedThreadPool(
      numInitializers,
      new ThreadFactoryBuilder().setDaemon(true)
          .setNameFormat("Initializer %d").build());
  this.initializerCompletionService = new ExecutorCompletionService<Void>(
      this.initializerExecutor);
  this.groupInputSpecs = taskSpec.getGroupInputs();
  initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
  this.startedInputsMap = startedInputsMap;
  this.inputReadyTracker = new InputReadyTracker();
}
 
public ParallelBruteForcePackager(List<Container> containers, ExecutorService executorService, int threads, boolean rotate3D, boolean binarySearch, int checkpointsPerDeadlineCheck) {
	super(containers, rotate3D, binarySearch, checkpointsPerDeadlineCheck);
	
	this.threads = threads;
	this.executorService = executorService;
	this.executorCompletionService = new ExecutorCompletionService<PackResult>(executorService);
}
 
public void testPool() throws Exception {
    PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(),
            "127.0.0.1", 8088, 4000);

    PbrpcMsg msg;
    msg = new PbrpcMsg();
    msg.setServiceId(100);
    msg.setProvider("beidou");
    msg.setData(getData(1));
    DemoResponse res = client.asyncTransport(DemoResponse.class, msg).get();
    System.out.println(res);

    int multiSize = 12;
    int totalRequestSize = 100000;
    ExecutorService pool = Executors.newFixedThreadPool(multiSize);
    CompletionService<DemoResponse> completionService = new ExecutorCompletionService<DemoResponse>(
            pool);

    Invoker invoker = new Invoker(client);
    long time = System.currentTimeMillis();
    for (int i = 0; i < totalRequestSize; i++) {
        completionService.submit(invoker);
    }

    for (int i = 0; i < totalRequestSize; i++) {
        completionService.take().get();
    }

    long timetook = System.currentTimeMillis() - time;
    LOG.info("Total using " + timetook + "ms");
    LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize)));
}
 
源代码23 项目: tinkerpop   文件: TinkerWorkerPool.java
public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int numberOfWorkers) {
    this.numberOfWorkers = numberOfWorkers;
    this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
    this.completionService = new ExecutorCompletionService<>(this.workerPool);
    for (int i = 0; i < this.numberOfWorkers; i++) {
        this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
        this.workerVertices.add(new ArrayList<>());
    }
    int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers;
    if (0 == batchSize)
        batchSize = 1;
    int counter = 0;
    int index = 0;

    List<Vertex> currentWorkerVertices = this.workerVertices.get(index);
    final Iterator<Vertex> iterator = graph.vertices();
    while (iterator.hasNext()) {
        final Vertex vertex = iterator.next();
        if (counter++ < batchSize || index == this.workerVertices.size() - 1) {
            currentWorkerVertices.add(vertex);
        } else {
            currentWorkerVertices = this.workerVertices.get(++index);
            currentWorkerVertices.add(vertex);
            counter = 1;
        }
    }
}
 
源代码24 项目: java-n-IDE-for-Android   文件: WaitableExecutor.java
/**
 * Creates an executor that will use at most <var>nThreads</var> threads.
 * @param nThreads the number of threads, or zero for default count (which is number of core)
 */
public WaitableExecutor(int nThreads) {
    if (nThreads < 1) {
        nThreads = Runtime.getRuntime().availableProcessors();
    }

    mExecutorService = Executors.newFixedThreadPool(nThreads);
    mCompletionService = new ExecutorCompletionService<T>(mExecutorService);
}
 
源代码25 项目: hbase   文件: LogRollBackupSubprocedurePool.java
public LogRollBackupSubprocedurePool(String name, Configuration conf) {
  // configure the executor service
  long keepAlive =
      conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
        LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
  int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
  this.name = name;
  executor =
      new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
          new LinkedBlockingQueue<>(),
          Threads.newDaemonThreadFactory("rs(" + name + ")-backup"));
  taskPool = new ExecutorCompletionService<>(executor);
}
 
源代码26 项目: stepchain   文件: StepAsyncExecutor.java
@Override
public Boolean execute(I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
	Boolean results = true;

	if (!processors.isEmpty()) {
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.newFixedThreadPool(parallelCount);
		CompletionService<Boolean> cService = new ExecutorCompletionService<>(threadPoolExecutor);
		results = executeByServiceThreadPool(cService, context, processors);
		threadPoolExecutor.shutdown();
		threadPoolExecutor.awaitTermination(11, TimeUnit.HOURS);
	}
	return results;
}
 
源代码27 项目: JTAF-XCore   文件: ConcurrentScheduler.java
public ConcurrentScheduler() {

		int threadCount = AutomationEngine.getInstance().getTestAgenda().getThreadCount();
		if (threadCount <= 0) {
			NTHREADS = 1;
		} else {
			NTHREADS = threadCount;
		}
		execService = Executors.newFixedThreadPool(NTHREADS);
		completionService = new ExecutorCompletionService<String>(execService);
		digraph = AutomationEngine.getInstance().getTestDigraph();
		
	}
 
源代码28 项目: netbeans   文件: IndexBinaryWorkPool.java
@Override
@NonNull
public Pair<Boolean,Collection<? extends URL>> execute(
        @NonNull final Function<URL,Boolean> fnc,
        @NonNull final Callable<Boolean> cancel,
        @NonNull final Collection<? extends URL> binaries) {
    final CompletionService<URL> cs = new ExecutorCompletionService<URL>(RP);
    int submitted = 0;
    for (URL binary : binaries) {
        cs.submit(new Task(binary,fnc, cancel));
        submitted++;
    }
    final Collection<URL> result = new ArrayDeque<URL>();
    //Don't break the cycle when is canceled,
    //rather wait for all submitted task, they should die fast.
    //The break will cause logging of wrong number of scanned roots.
    for (int i=0; i< submitted; i++) {
        try {                    
            final Future<URL> becomeURL = cs.take();
            final URL url = becomeURL.get();
            if (url != null) {
                result.add(url);
            }
        } catch (Exception ex) {
            Exceptions.printStackTrace(ex);
        }
    }
    boolean success;
    try {
        success = !cancel.call();
    } catch (Exception e) {
        Exceptions.printStackTrace(e);
        success = false;
    }
    LOG.log(Level.FINER, "Canceled: {0}", !success);  //NOI18N
    return Pair.<Boolean,Collection<? extends URL>>of(success,result);
}
 
FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
  this.abortable = abortable;
  // configure the executor service
  long keepAlive = conf.getLong(
    RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
    RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
  int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
  this.name = name;
  executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
      "rs(" + name + ")-flush-proc");
  taskPool = new ExecutorCompletionService<>(executor);
}
 
源代码30 项目: hudi   文件: BoundedInMemoryExecutor.java
/**
 * Main API to run both production and consumption.
 */
public E execute() {
  try {
    ExecutorCompletionService<Boolean> producerService = startProducers();
    Future<E> future = startConsumer();
    // Wait for consumer to be done
    return future.get();
  } catch (Exception e) {
    throw new HoodieException(e);
  }
}
 
 类所在包
 同包方法