类java.util.concurrent.ScheduledThreadPoolExecutor源码实例Demo

下面列出了怎么用java.util.concurrent.ScheduledThreadPoolExecutor的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: AlgoTrader   文件: TimerServiceImpl.java
private void getScheduledThreadPoolExecutorDaemonThread() {
	timer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
		// set new thread as daemon thread and name appropriately
		public Thread newThread(Runnable r) {
               String uri = engineURI;
               if (engineURI == null)
               {
                   uri = "default";
               }
               Thread t = new Thread(r, "com.espertech.esper.Timer-" + uri + "-" + id);
			//t.setDaemon(true);
			return t;
		}
	});
	timer.setMaximumPoolSize(timer.getCorePoolSize());
}
 
源代码2 项目: hadoop-ozone   文件: ThrottledAsyncChecker.java
public ThrottledAsyncChecker(final Timer timer,
                             final long minMsBetweenChecks,
                             final long diskCheckTimeout,
                             final ExecutorService executorService) {
  this.timer = timer;
  this.minMsBetweenChecks = minMsBetweenChecks;
  this.diskCheckTimeout = diskCheckTimeout;
  this.executorService = MoreExecutors.listeningDecorator(executorService);
  this.checksInProgress = new HashMap<>();
  this.completedChecks = new WeakHashMap<>();

  if (this.diskCheckTimeout > 0) {
    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
        ScheduledThreadPoolExecutor(1);
    this.scheduledExecutorService = MoreExecutors
        .getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
  } else {
    this.scheduledExecutorService = null;
  }
}
 
源代码3 项目: vespa   文件: Metrics.java
private Metrics(Metric metric, Statistics statistics, HealthMonitorConfig healthMonitorConfig,
                ZookeeperServerConfig zkServerConfig, boolean createZkMetricUpdater) {
    this.metric = metric;
    requests = createCounter(METRIC_REQUESTS, statistics);
    failedRequests = createCounter(METRIC_FAILED_REQUESTS, statistics);
    procTimeCounter = createCounter("procTime", statistics);

    if (createZkMetricUpdater) {
        log.log(Level.FINE, "Metric update interval is " + healthMonitorConfig.snapshot_interval() + " seconds");
        long intervalMs = (long) (healthMonitorConfig.snapshot_interval() * 1000);
        executorService = Optional.of(new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("configserver-metrics")));
        executorService.get().scheduleAtFixedRate(this, 20000, intervalMs, TimeUnit.MILLISECONDS);
        zkMetricUpdater = Optional.of(new ZKMetricUpdater(zkServerConfig, 19500, intervalMs));
    } else {
        executorService = Optional.empty();
        zkMetricUpdater = Optional.empty();
    }
}
 
源代码4 项目: AndroidWearable-Samples   文件: MainActivity.java
@Override
public void onCreate(Bundle b) {
    super.onCreate(b);
    mHandler = new Handler();
    LOGD(TAG, "onCreate");
    mCameraSupported = getPackageManager().hasSystemFeature(PackageManager.FEATURE_CAMERA);
    setContentView(R.layout.main_activity);
    setupViews();

    // Stores DataItems received by the local broadcaster or from the paired watch.
    mDataItemListAdapter = new DataItemAdapter(this, android.R.layout.simple_list_item_1);
    mDataItemList.setAdapter(mDataItemListAdapter);

    mGeneratorExecutor = new ScheduledThreadPoolExecutor(1);

    mGoogleApiClient = new GoogleApiClient.Builder(this)
            .addApi(Wearable.API)
            .addConnectionCallbacks(this)
            .addOnConnectionFailedListener(this)
            .build();
}
 
源代码5 项目: openjdk-jdk9   文件: ScheduledExecutorTest.java
/**
 * A fixed delay task with overflowing period should not prevent a
 * one-shot task from executing.
 * https://bugs.openjdk.java.net/browse/JDK-8051859
 */
public void testScheduleWithFixedDelay_overflow() throws Exception {
    final CountDownLatch delayedDone = new CountDownLatch(1);
    final CountDownLatch immediateDone = new CountDownLatch(1);
    final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        final Runnable immediate = new Runnable() { public void run() {
            immediateDone.countDown();
        }};
        final Runnable delayed = new Runnable() { public void run() {
            delayedDone.countDown();
            p.submit(immediate);
        }};
        p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
        await(delayedDone);
        await(immediateDone);
    }
}
 
源代码6 项目: openjdk-jdk9   文件: ScheduledExecutorTest.java
/**
 * getQueue returns the work queue, which contains queued tasks
 */
public void testGetQueue() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        ScheduledFuture[] tasks = new ScheduledFuture[5];
        for (int i = 0; i < tasks.length; i++) {
            Runnable r = new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadStarted.countDown();
                    await(done);
                }};
            tasks[i] = p.schedule(r, 1, MILLISECONDS);
        }
        await(threadStarted);
        BlockingQueue<Runnable> q = p.getQueue();
        assertTrue(q.contains(tasks[tasks.length - 1]));
        assertFalse(q.contains(tasks[0]));
    }
}
 
源代码7 项目: astor   文件: TimedSemaphore.java
/**
 * Creates a new instance of {@link TimedSemaphore} and initializes it with
 * an executor service, the given time period, and the limit. The executor
 * service will be used for creating a periodic task for monitoring the time
 * period. It can be <b>null</b>, then a default service will be created.
 *
 * @param service the executor service
 * @param timePeriod the time period
 * @param timeUnit the unit for the period
 * @param limit the limit for the semaphore
 * @throws IllegalArgumentException if the period is less or equals 0
 */
public TimedSemaphore(ScheduledExecutorService service, long timePeriod,
        TimeUnit timeUnit, int limit) {
    if (timePeriod <= 0) {
        throw new IllegalArgumentException("Time period must be greater 0!");
    }

    period = timePeriod;
    unit = timeUnit;

    if (service != null) {
        executorService = service;
        ownExecutor = false;
    } else {
        ScheduledThreadPoolExecutor s = new ScheduledThreadPoolExecutor(
                THREAD_POOL_SIZE);
        s.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        s.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        executorService = s;
        ownExecutor = true;
    }

    setLimit(limit);
}
 
源代码8 项目: j2objc   文件: ScheduledExecutorTest.java
/**
 * purge eventually removes cancelled tasks from the queue
 */
public void testPurge() throws InterruptedException {
    final ScheduledFuture[] tasks = new ScheduledFuture[5];
    final Runnable releaser = new Runnable() { public void run() {
        for (ScheduledFuture task : tasks)
            if (task != null) task.cancel(true); }};
    final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    try (PoolCleaner cleaner = cleaner(p, releaser)) {
        for (int i = 0; i < tasks.length; i++)
            tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(),
                                  LONG_DELAY_MS, MILLISECONDS);
        int max = tasks.length;
        if (tasks[4].cancel(true)) --max;
        if (tasks[3].cancel(true)) --max;
        // There must eventually be an interference-free point at
        // which purge will not fail. (At worst, when queue is empty.)
        long startTime = System.nanoTime();
        do {
            p.purge();
            long count = p.getTaskCount();
            if (count == max)
                return;
        } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
        fail("Purge failed to remove cancelled tasks");
    }
}
 
源代码9 项目: openjdk-jdk9   文件: ScheduledExecutorTest.java
/**
 * purge eventually removes cancelled tasks from the queue
 */
public void testPurge() throws InterruptedException {
    final ScheduledFuture[] tasks = new ScheduledFuture[5];
    final Runnable releaser = new Runnable() { public void run() {
        for (ScheduledFuture task : tasks)
            if (task != null) task.cancel(true); }};
    final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    try (PoolCleaner cleaner = cleaner(p, releaser)) {
        for (int i = 0; i < tasks.length; i++)
            tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(),
                                  LONG_DELAY_MS, MILLISECONDS);
        int max = tasks.length;
        if (tasks[4].cancel(true)) --max;
        if (tasks[3].cancel(true)) --max;
        // There must eventually be an interference-free point at
        // which purge will not fail. (At worst, when queue is empty.)
        long startTime = System.nanoTime();
        do {
            p.purge();
            long count = p.getTaskCount();
            if (count == max)
                return;
        } while (millisElapsedSince(startTime) < LONG_DELAY_MS);
        fail("Purge failed to remove cancelled tasks");
    }
}
 
源代码10 项目: jdk8u60   文件: ZeroCorePoolSize.java
void test(String[] args) throws Throwable {

        ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(0);
        Runnable task = new Runnable() {
            public void run() {
                taskRun = true;
            }
        };
        check(pool.getCorePoolSize() == 0);

        pool.schedule(task, 1, TimeUnit.SECONDS);

        pool.shutdown();
        check(pool.awaitTermination(20L, TimeUnit.SECONDS));
        check(pool.getCorePoolSize() == 0);
        check(taskRun);
    }
 
源代码11 项目: qpid-jms   文件: MockProvider.java
public MockProvider(URI remoteURI, MockProviderConfiguration configuration, MockRemotePeer context, ProviderFutureFactory futureFactory) {
    this.remoteURI = remoteURI;
    this.configuration = configuration;
    this.context = context;
    this.stats = new MockProviderStats(context != null ? context.getContextStats() : null);
    this.futureFactory = futureFactory;

    serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {

        @Override
        public Thread newThread(Runnable runner) {
            Thread serial = new Thread(runner);
            serial.setDaemon(true);
            serial.setName(MockProvider.this.getClass().getSimpleName() + ":(" +
                           PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
                           getRemoteURI() + "]");
            return serial;
        }
    });

    serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
}
 
@Test
public void sendNormalEvents() {
    CustomStorageHelper fs = new CustomStorageHelper();
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    try {
        for(int i = 0; i < 50; i++) {
            fs.add(EventHelper.generateABCEvent());
        }
    } catch (Exception e) {
    }

    ArrayList<IStorage> storages = new ArrayList<IStorage>();
    storages.add(fs);

    EventQueueWriter eventQueueWriter = new EventQueueWriter(url, storages, new ClientTelemetry(), new ArrayList<ICllEvents>(), new CustomLogger(), scheduledExecutorService);
    eventQueueWriter.setSender(eventSenderOverride);
    eventQueueWriter.run();

    assert(eventSenderOverride.getNumberOfEventsAccepted() == 50);
}
 
源代码13 项目: big-c   文件: JobHistory.java
@Override
protected void serviceStart() throws Exception {
  hsManager.start();
  if (storage instanceof Service) {
    ((Service) storage).start();
  }

  scheduledExecutor = new ScheduledThreadPoolExecutor(2,
      new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
          .build());

  scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
      moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);

  // Start historyCleaner
  scheduleHistoryCleaner();
  super.serviceStart();
}
 
源代码14 项目: j2objc   文件: ScheduledExecutorSubclassTest.java
/**
 * getQueue returns the work queue, which contains queued tasks
 */
public void testGetQueue() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        ScheduledFuture[] tasks = new ScheduledFuture[5];
        for (int i = 0; i < tasks.length; i++) {
            Runnable r = new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadStarted.countDown();
                    await(done);
                }};
            tasks[i] = p.schedule(r, 1, MILLISECONDS);
        }
        await(threadStarted);
        BlockingQueue<Runnable> q = p.getQueue();
        assertTrue(q.contains(tasks[tasks.length - 1]));
        assertFalse(q.contains(tasks[0]));
    }
}
 
源代码15 项目: elasticsearch-helper   文件: BulkProcessor.java
BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
    this.bulkActions = bulkActions;
    this.bulkSize = bulkSize.bytes();

    this.bulkRequest = new BulkRequest();
    this.bulkRequestHandler = concurrentRequests == 0 ?
            new SyncBulkRequestHandler(client, listener) :
            new AsyncBulkRequestHandler(client, listener, concurrentRequests);

    if (flushInterval != null) {
        this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
        this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
    } else {
        this.scheduler = null;
        this.scheduledFuture = null;
    }
}
 
源代码16 项目: openjdk-jdk9   文件: ScheduledExecutorTest.java
/**
 * timed invokeAny(c) throws ExecutionException if no task completes
 */
public void testTimedInvokeAny4() throws Exception {
    final ExecutorService e = new ScheduledThreadPoolExecutor(2);
    try (PoolCleaner cleaner = cleaner(e)) {
        long startTime = System.nanoTime();
        List<Callable<String>> l = new ArrayList<>();
        l.add(new NPETask());
        try {
            e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS);
            shouldThrow();
        } catch (ExecutionException success) {
            assertTrue(success.getCause() instanceof NullPointerException);
        }
        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
    }
}
 
源代码17 项目: conductor   文件: ExecutionDAOFacade.java
@Inject
public ExecutionDAOFacade(ExecutionDAO executionDAO, QueueDAO queueDAO, IndexDAO indexDAO,
    RateLimitingDAO rateLimitingDao, PollDataDAO pollDataDAO, ObjectMapper objectMapper, Configuration config) {
    this.executionDAO = executionDAO;
    this.queueDAO = queueDAO;
    this.indexDAO = indexDAO;
    this.rateLimitingDao = rateLimitingDao;
    this.pollDataDAO = pollDataDAO;
    this.objectMapper = objectMapper;
    this.config = config;
    this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(4,
        (runnable, executor) -> {
        LOGGER.warn("Request {} to delay updating index dropped in executor {}", runnable, executor);
        Monitors.recordDiscardedIndexingCount("delayQueue");
    });
    this.scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
}
 
源代码18 项目: dns-java   文件: DnsSrvWatchers.java
public DnsSrvWatcher<T> build() {
  checkState(polling ^ dnsSrvWatcherFactory != null, "specify either polling or custom trigger");

  DnsSrvWatcherFactory<T> watcherFactory;
  if (polling) {
    final ScheduledExecutorService executor =
        scheduledExecutorService != null
        ? scheduledExecutorService
        : MoreExecutors.getExitingScheduledExecutorService(
            new ScheduledThreadPoolExecutor(
                1, new ThreadFactoryBuilder().setNameFormat("dns-lookup-%d").build()),
            0, SECONDS);

    watcherFactory =
        cnf -> new PollingDnsSrvWatcher<>(cnf, executor, pollingInterval, pollingIntervalUnit);
  } else {
    watcherFactory = requireNonNull(dnsSrvWatcherFactory, "dnsSrvWatcherFactory");
  }

  final ChangeNotifierFactory<T> changeNotifierFactory =
      fqdn -> new ServiceResolvingChangeNotifier<>(
          resolver, fqdn, resultTransformer, errorHandler);

  return watcherFactory.create(changeNotifierFactory);
}
 
源代码19 项目: gemfirexd-oss   文件: CacheClientNotifier.java
public void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
  blackListedClients.add(proxyID);
  // ensure that cache and distributed system state are current and open
  this.getCache();
  new ScheduledThreadPoolExecutor(1).schedule(
      new ExpireBlackListTask(proxyID), 120, TimeUnit.SECONDS);
}
 
private ScheduledExecutorService createScheduledExecutorService() {
    ManagedThreadFactory managedThreadFactory;
    try {
        managedThreadFactory = ThreadFactories.findThreadFactory(threadFactory);
    } catch (final Exception e) {
        Logger.getInstance(LogCategory.OPENEJB, ManagedScheduledExecutorServiceImplFactory.class).warning("Unable to create configured thread factory: " + threadFactory, e);
        managedThreadFactory = new ManagedThreadFactoryImpl();
    }

    return new ScheduledThreadPoolExecutor(core, managedThreadFactory, CURejectHandler.INSTANCE);
}
 
源代码21 项目: Raincat   文件: TxManagerLocator.java
private TxManagerLocator() {
    List<TxManagerServiceDTO> initial = Lists.newArrayList();
    listAtomicReference = new AtomicReference<>(initial);
    type = new TypeToken<List<TxManagerServiceDTO>>() {
    }.getType();
    this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
            TxTransactionThreadFactory.create("TxManagerLocator", true));
}
 
/**
 * Schedule the sweeper and stability checkers
 */
public synchronized void start() {
  if (!_started.get()) {
    _executorService = new ScheduledThreadPoolExecutor(1,
        ExecutorsUtils.newThreadFactory(Optional.of(LoggerFactory.getLogger(FineGrainedWatermarkTracker.class))));
    _executorService.scheduleAtFixedRate(_sweeper, 0, _sweepIntervalMillis, TimeUnit.MILLISECONDS);
    _executorService.scheduleAtFixedRate(_stabilityChecker, 0, _stabilityCheckIntervalMillis, TimeUnit.MILLISECONDS);
  }
  _started.set(true);
}
 
源代码23 项目: jboot   文件: JbootScheduleManager.java
public JbootScheduleManager() {
    config = Jboot.config(JbooScheduleConfig.class);
    fixedScheduler = new ScheduledThreadPoolExecutor(config.getPoolSize(),new NamedThreadFactory("jboot-scheduler"));

    File cron4jProperties = new File(PathKit.getRootClassPath(), config.getCron4jFile());
    cron4jPlugin = cron4jProperties.exists()
            ? new JbootCron4jPlugin(new Prop(config.getCron4jFile()))
            : new JbootCron4jPlugin();
}
 
源代码24 项目: j2objc   文件: ScheduledExecutorTest.java
/**
 * completed submit of callable returns result
 */
public void testSubmitCallable() throws Exception {
    final ExecutorService e = new ScheduledThreadPoolExecutor(2);
    try (PoolCleaner cleaner = cleaner(e)) {
        Future<String> future = e.submit(new StringTask());
        String result = future.get();
        assertSame(TEST_STRING, result);
    }
}
 
LazyTraceScheduledThreadPoolExecutor(int corePoolSize,
		RejectedExecutionHandler handler, BeanFactory beanFactory,
		ScheduledThreadPoolExecutor delegate) {
	super(corePoolSize, handler);
	this.beanFactory = beanFactory;
	this.delegate = delegate;
	this.decorateTaskRunnable = ReflectionUtils.findMethod(
			ScheduledThreadPoolExecutor.class, "decorateTask", Runnable.class,
			RunnableScheduledFuture.class);
	makeAccessibleIfNotNull(this.decorateTaskRunnable);
	this.decorateTaskCallable = ReflectionUtils.findMethod(
			ScheduledThreadPoolExecutor.class, "decorateTaskCallable", Callable.class,
			RunnableScheduledFuture.class);
	makeAccessibleIfNotNull(this.decorateTaskCallable);
	this.finalize = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
			"finalize", null);
	makeAccessibleIfNotNull(this.finalize);
	this.beforeExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
			"beforeExecute", null);
	makeAccessibleIfNotNull(this.beforeExecute);
	this.afterExecute = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
			"afterExecute", null);
	makeAccessibleIfNotNull(this.afterExecute);
	this.terminated = ReflectionUtils.findMethod(ScheduledThreadPoolExecutor.class,
			"terminated", null);
	makeAccessibleIfNotNull(this.terminated);
	this.newTaskForRunnable = ReflectionUtils.findMethod(
			ScheduledThreadPoolExecutor.class, "newTaskFor", Runnable.class,
			Object.class);
	makeAccessibleIfNotNull(this.newTaskForRunnable);
	this.newTaskForCallable = ReflectionUtils.findMethod(
			ScheduledThreadPoolExecutor.class, "newTaskFor", Callable.class,
			Object.class);
	makeAccessibleIfNotNull(this.newTaskForCallable);
}
 
源代码26 项目: j2objc   文件: ScheduledExecutorTest.java
/**
 * schedule(null) throws NPE
 */
public void testScheduleNull() throws InterruptedException {
    final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        try {
            TrackedCallable callable = null;
            Future f = p.schedule(callable, SHORT_DELAY_MS, MILLISECONDS);
            shouldThrow();
        } catch (NullPointerException success) {}
    }
}
 
private void startFlusher() {
	this.flusher = new ScheduledThreadPoolExecutor(1,
			new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "CloudHBase Flusher"));
	this.flusher.setRemoveOnCancelPolicy(true);
	this.flusher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
	this.flusher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

	this.flusher.scheduleAtFixedRate(() -> {
		if (System.currentTimeMillis() - lastFlushTime >= flushInterval) {
			flush(false);
		}
	}, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
}
 
源代码28 项目: j2objc   文件: ScheduledExecutorTest.java
/**
 * timed invokeAll(c) returns results of all completed tasks
 */
public void testTimedInvokeAll5() throws Exception {
    final ExecutorService e = new ScheduledThreadPoolExecutor(2);
    try (PoolCleaner cleaner = cleaner(e)) {
        List<Callable<String>> l = new ArrayList<Callable<String>>();
        l.add(new StringTask());
        l.add(new StringTask());
        List<Future<String>> futures =
            e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS);
        assertEquals(2, futures.size());
        for (Future<String> future : futures)
            assertSame(TEST_STRING, future.get());
    }
}
 
源代码29 项目: compass   文件: DatabaseAvailabilityDetector.java
@Override
public void afterPropertiesSet() throws Exception {
	if (logger.isDebugEnabled()) {
		logger.debug("[Datasource Availability Detector] started.");
	}
	started = true;
	executorService= new ScheduledThreadPoolExecutor(detectPoolSize,
			new DefaultThreadFactory("DatabaseAvailabilityDetector"));
}
 
public TrackerBasedWatermarkManager(WatermarkStorage storage, FineGrainedWatermarkTracker watermarkTracker,
    long commitIntervalMillis, Optional<Logger> logger) {
  Preconditions.checkArgument(storage != null, "WatermarkStorage cannot be null");
  Preconditions.checkArgument(watermarkTracker != null, "WatermarkTracker cannot be null");
  _watermarkTracker = watermarkTracker;
  _watermarkStorage = storage;
  _commitIntervalMillis = commitIntervalMillis;
  _logger = logger.or(LoggerFactory.getLogger(TrackerBasedWatermarkManager.class));
  _watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(logger,
      Optional.of("WatermarkManager-%d")));
  _retrievalStatus = new RetrievalStatus();
  _commitStatus = new CommitStatus();
}
 
 类所在包
 同包方法