下面列出了怎么用com.google.common.util.concurrent.ListeningScheduledExecutorService的API类实例代码及写法,或者点击链接到github查看源代码。
public void start() {
if (!init.compareAndSet(false, true)) {
return;
}
initDumpDir("jstack");
initDumpDir("qjdump");
initDumpDir("bistoury-class-dump");
final File file = new File(BASE_DUMP_DIR);
ListeningScheduledExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("clean-dump-file")));
listeningDecorator.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
clean(file);
}
}, 0, 1, TimeUnit.HOURS);
}
@Inject
PersistenceShutdownHook(final @NotNull ClientSessionPersistence clientSessionPersistence,
final @NotNull ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence,
final @NotNull IncomingMessageFlowPersistence incomingMessageFlowPersistence,
final @NotNull RetainedMessagePersistence retainedMessagePersistence,
final @NotNull PublishPayloadPersistence payloadPersistence,
final @NotNull ClientQueuePersistence clientQueuePersistence,
final @NotNull @Persistence ListeningExecutorService persistenceExecutorService,
final @NotNull @Persistence ListeningScheduledExecutorService persistenceScheduledExecutorService,
final @NotNull @PayloadPersistence ListeningScheduledExecutorService payloadPersistenceExecutor,
final @NotNull SingleWriterService singleWriterService) {
this.clientSessionPersistence = clientSessionPersistence;
this.clientSessionSubscriptionPersistence = clientSessionSubscriptionPersistence;
this.incomingMessageFlowPersistence = incomingMessageFlowPersistence;
this.retainedMessagePersistence = retainedMessagePersistence;
this.clientQueuePersistence = clientQueuePersistence;
this.persistenceExecutorService = persistenceExecutorService;
this.persistenceScheduledExecutorService = persistenceScheduledExecutorService;
this.payloadPersistenceExecutor = payloadPersistenceExecutor;
this.singleWriterService = singleWriterService;
this.payloadPersistence = payloadPersistence;
}
@Override
protected void configure() {
bind(ShutdownHooks.class).asEagerSingleton();
bind(PersistenceStartup.class).asEagerSingleton();
bind(PersistenceStartupShutdownHookInstaller.class).asEagerSingleton();
if (persistenceConfigurationService.getMode() == PersistenceConfigurationService.PersistenceMode.FILE) {
install(new PersistenceMigrationFileModule());
} else {
install(new LocalPersistenceMemoryModule(null));
}
bind(PublishPayloadPersistence.class).to(PublishPayloadPersistenceImpl.class).in(Singleton.class);
bind(MetricRegistry.class).toInstance(metricRegistry);
bind(MetricsHolder.class).toProvider(MetricsHolderProvider.class).asEagerSingleton();
bind(ListeningScheduledExecutorService.class).annotatedWith(PayloadPersistence.class)
.toProvider(PayloadPersistenceScheduledExecutorProvider.class)
.in(LazySingleton.class);
bind(MessageDroppedService.class).toProvider(MessageDroppedServiceProvider.class).in(Singleton.class);
}
@Inject
PublishPayloadPersistenceImpl(final @NotNull PublishPayloadLocalPersistence localPersistence,
final @NotNull @PayloadPersistence ListeningScheduledExecutorService scheduledExecutorService) {
this.localPersistence = localPersistence;
this.scheduledExecutorService = scheduledExecutorService;
hashFunction = LongHashFunction.xx();
payloadCache = CacheBuilder.newBuilder()
.expireAfterAccess(InternalConfigurations.PAYLOAD_CACHE_DURATION.get(), TimeUnit.MILLISECONDS)
.maximumSize(InternalConfigurations.PAYLOAD_CACHE_SIZE.get())
.concurrencyLevel(InternalConfigurations.PAYLOAD_CACHE_CONCURRENCY_LEVEL.get())
.removalListener(new PayloadCacheRemovalListener(hashFunction, lookupTable))
.build();
removeSchedule = InternalConfigurations.PAYLOAD_PERSISTENCE_CLEANUP_SCHEDULE.get();
bucketLock = new BucketLock(InternalConfigurations.PAYLOAD_PERSISTENCE_BUCKET_COUNT.get());
}
public ApacheThriftMethodInvoker(
ListeningExecutorService executorService,
ListeningScheduledExecutorService delayService,
TTransportFactory transportFactory,
TProtocolFactory protocolFactory,
Duration connectTimeout,
Duration requestTimeout,
Optional<HostAndPort> socksProxy,
Optional<SSLContext> sslContext)
{
this.executorService = requireNonNull(executorService, "executorService is null");
this.delayService = requireNonNull(delayService, "delayService is null");
this.transportFactory = requireNonNull(transportFactory, "transportFactory is null");
this.protocolFactory = requireNonNull(protocolFactory, "protocolFactory is null");
this.connectTimeoutMillis = Ints.saturatedCast(requireNonNull(connectTimeout, "connectTimeout is null").toMillis());
this.requestTimeoutMillis = Ints.saturatedCast(requireNonNull(requestTimeout, "requestTimeout is null").toMillis());
this.socksProxy = requireNonNull(socksProxy, "socksProxy is null");
this.sslContext = requireNonNull(sslContext, "sslContext is null");
}
public synchronized void start(Runnable task) {
if (isStarted()) {
return;
}
requireNonNull(task, "task");
final ListeningScheduledExecutorService scheduler = MoreExecutors.listeningDecorator(purgeWorker);
this.scheduler = scheduler;
@SuppressWarnings("UnstableApiUsage")
final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
task,
TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {}
@Override
public void onFailure(Throwable cause) {
logger.error("Storage purge scheduler stopped due to an unexpected exception:", cause);
}
}, purgeWorker);
}
public StubInstance(
String name,
String identifier,
DigestUtil digestUtil,
ManagedChannel channel,
long deadlineAfter,
TimeUnit deadlineAfterUnits,
Retrier retrier,
@Nullable ListeningScheduledExecutorService retryService) {
this.name = name;
this.identifier = identifier;
this.digestUtil = digestUtil;
this.channel = channel;
this.deadlineAfter = deadlineAfter;
this.deadlineAfterUnits = deadlineAfterUnits;
this.retrier = retrier;
this.retryService = retryService;
}
@Produces
static ListenableFuture<List<Long>> fetchFromFakeDb(ServiceRequestContext context,
ListeningScheduledExecutorService blockingExecutor) {
// The context is mounted in a thread-local, meaning it is available to all logic such as tracing.
checkState(ServiceRequestContext.current() == context);
checkState(context.eventLoop().inEventLoop());
// This logic mimics using a blocking method, which would usually be something like a MySQL database
// query using JDBC.
// Always run blocking logic on the blocking task executor. By using
// ServiceRequestContext.blockingTaskExecutor (indirectly via the ListeningScheduledExecutorService
// wrapper we defined in MainModule), you also ensure the context is mounted inside the logic (e.g.,
// your DB call will be traced!).
return blockingExecutor.submit(() -> {
// The context is mounted in a thread-local, meaning it is available to all logic such as tracing.
checkState(ServiceRequestContext.current() == context);
checkState(!context.eventLoop().inEventLoop());
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50));
return ImmutableList.of(23L, -23L);
});
}
@Override
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
{
ListeningScheduledExecutorService executorService = listeningDecorator(newSingleThreadScheduledExecutor(daemonThreadsNamed("blackhole")));
return new BlackHoleConnector(
new BlackHoleMetadata(),
new BlackHoleSplitManager(),
new BlackHolePageSourceProvider(executorService),
new BlackHolePageSinkProvider(executorService),
new BlackHoleNodePartitioningProvider(context.getNodeManager()),
context.getTypeManager(),
executorService);
}
BlackHolePageSource(Page page, int count, ListeningScheduledExecutorService executorService, Duration pageProcessingDelay)
{
this.page = requireNonNull(page, "page is null");
checkArgument(count >= 0, "count is negative");
this.pagesLeft = count;
this.executorService = requireNonNull(executorService, "executorService is null");
this.pageProcessingDelayInMillis = requireNonNull(pageProcessingDelay, "pageProcessingDelay is null").toMillis();
this.memoryUsageBytes = page.getSizeInBytes();
}
@Override
protected void configure() {
install(new LocalPersistenceModule(persistenceInjector, persistenceConfigurationService));
bind(ShutdownHooks.class).toInstance(persistenceInjector.getInstance(ShutdownHooks.class));
bind(PersistenceShutdownHookInstaller.class).asEagerSingleton();
bind(ExecutorService.class).annotatedWith(Persistence.class)
.toProvider(PersistenceExecutorProvider.class)
.in(LazySingleton.class);
bind(ListeningExecutorService.class).annotatedWith(Persistence.class)
.toProvider(PersistenceExecutorProvider.class)
.in(LazySingleton.class);
bind(ScheduledExecutorService.class).annotatedWith(Persistence.class)
.toProvider(PersistenceScheduledExecutorProvider.class)
.in(LazySingleton.class);
bind(ListeningScheduledExecutorService.class).annotatedWith(Persistence.class)
.toProvider(PersistenceScheduledExecutorProvider.class)
.in(LazySingleton.class);
bindIfAbsent(ListeningScheduledExecutorService.class,
PayloadPersistenceScheduledExecutorProvider.class,
PayloadPersistence.class);
bind(TopicTreeStartup.class).asEagerSingleton();
bind(CleanUpService.class).asEagerSingleton();
bind(AbstractFutureUtils.class).to(FutureUtilsImpl.class).asEagerSingleton();
requestStaticInjection(FutureUtils.class);
}
@NotNull
@Override
@LazySingleton
@PayloadPersistence
public ListeningScheduledExecutorService get() {
if (executorService == null) {
final ThreadFactory threadFactory = ThreadFactoryUtil.create("payload-persistence-cleanup-%d");
final int coreSize = InternalConfigurations.PAYLOAD_PERSISTENCE_CLEANUP_THREADS.get();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(coreSize, threadFactory);
executorService = MoreExecutors.listeningDecorator(scheduledExecutorService);
}
return executorService;
}
@Override
@LazySingleton
@Persistence
@NotNull
public ListeningScheduledExecutorService get() {
if (executorService == null) {
final ThreadFactory threadFactory = ThreadFactoryUtil.create("scheduled-persistence-executor");
final ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.executorService = MoreExecutors.listeningDecorator(singleThreadExecutor);
}
return executorService;
}
@Inject
public PendingWillMessages(@NotNull final InternalPublishService publishService,
@Persistence final ListeningScheduledExecutorService executorService,
@NotNull final ClientSessionPersistence clientSessionPersistence,
@NotNull final ClientSessionLocalPersistence clientSessionLocalPersistence) {
this.publishService = publishService;
this.executorService = executorService;
this.clientSessionPersistence = clientSessionPersistence;
this.clientSessionLocalPersistence = clientSessionLocalPersistence;
executorService.scheduleAtFixedRate(new CheckWillsTask(), WILL_DELAY_CHECK_SCHEDULE, WILL_DELAY_CHECK_SCHEDULE, TimeUnit.SECONDS);
}
private Injector createInjector(final LocalPersistenceModule localPersistenceModule) {
return Guice.createInjector(
localPersistenceModule,
new LazySingletonModule(),
new ThrottlingModule(),
new MQTTServiceModule(),
new AbstractModule() {
@Override
protected void configure() {
bind(FullConfigurationService.class).toInstance(configurationService);
bind(TopicMatcher.class).toInstance(topicMatcher);
bind(SystemInformation.class).toInstance(systemInformation);
bind(ListeningExecutorService.class).annotatedWith(Persistence.class)
.toInstance(listeningExecutorService);
bind(ListeningScheduledExecutorService.class).annotatedWith(Persistence.class)
.toInstance(listeningScheduledExecutorService);
bind(ListeningScheduledExecutorService.class).annotatedWith(PayloadPersistence.class)
.toInstance(listeningScheduledExecutorService);
bind(MessageIDPools.class).toInstance(messageIDProducers);
bind(MetricsHolder.class).toInstance(metricsHolder);
bind(MetricRegistry.class).toInstance(new MetricRegistry());
bind(SingleWriterService.class).toInstance(singleWriterService);
bind(EventLog.class).toInstance(eventLog);
bind(MessageDroppedService.class).toInstance(messageDroppedService);
bind(RestrictionsConfigurationService.class).toInstance(new RestrictionsConfigurationServiceImpl());
bind(MqttConfigurationService.class).toInstance(mqttConfigurationService);
}
});
}
public WSQueryEndpoint(
ObjectMapper mapper,
StatementParser statementParser,
KsqlEngine ksqlEngine,
ListeningScheduledExecutorService exec
) {
this.mapper = mapper;
this.statementParser = statementParser;
this.ksqlEngine = ksqlEngine;
this.exec = exec;
}
@Override
protected void registerWebSocketEndpoints(ServerContainer container) {
try {
final ListeningScheduledExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
config.getInt(KsqlRestConfig.KSQL_WEBSOCKETS_NUM_THREADS),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("websockets-query-thread-%d")
.build()
)
);
final ObjectMapper mapper = getJsonMapper();
final StatementParser statementParser = new StatementParser(ksqlEngine);
container.addEndpoint(
ServerEndpointConfig.Builder
.create(
WSQueryEndpoint.class,
WSQueryEndpoint.class.getAnnotation(ServerEndpoint.class).value()
)
.configurator(new Configurator() {
@Override
@SuppressWarnings("unchecked")
public <T> T getEndpointInstance(Class<T> endpointClass) {
return (T) new WSQueryEndpoint(
mapper,
statementParser,
ksqlEngine,
exec
);
}
})
.build()
);
} catch (DeploymentException e) {
log.error("Unable to create websockets endpoint", e);
}
}
private static ListeningScheduledExecutorService newScheduler() {
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
return MoreExecutors.listeningDecorator(scheduler);
}
public Retrier(
Supplier<Backoff> backoffSupplier,
Predicate<Status> isRetriable,
ListeningScheduledExecutorService retryScheduler) {
this.backoffSupplier = backoffSupplier;
this.isRetriable = isRetriable;
this.retryScheduler = retryScheduler;
}
static InputStream newInput(
String instanceName,
Digest digest,
ByteStreamStub bsStub,
ListeningScheduledExecutorService retryService)
throws IOException {
return ByteStreamHelper.newInput(
blobName(instanceName, digest),
0,
() -> bsStub.withDeadlineAfter(10, TimeUnit.SECONDS),
Retrier.Backoff.exponential(Duration.ofSeconds(0), Duration.ofSeconds(0), 2, 0, 5),
Retrier.DEFAULT_IS_RETRIABLE,
retryService);
}
static Runnable blobGetter(
Path root,
String instanceName,
Digest digest,
ByteStreamStub bsStub,
AtomicLong outstandingOperations,
ListeningScheduledExecutorService retryService) {
if (digest.getSizeBytes() == 0) {
return () -> outstandingOperations.getAndDecrement();
}
return new Runnable() {
@Override
public void run() {
Path file = root.resolve(digest.getHash());
try {
if (!Files.exists(file) || Files.size(file) != digest.getSizeBytes()) {
System.out.println("Getting blob " + digest.getHash() + "/" + digest.getSizeBytes());
try (OutputStream out = Files.newOutputStream(file)) {
try (InputStream in = newInput(instanceName, digest, bsStub, retryService)) {
ByteStreams.copy(in, out);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
outstandingOperations.getAndDecrement();
}
};
}
@VisibleForTesting
CoreSocketFactory(
ListenableFuture<KeyPair> localKeyPair,
SQLAdmin adminApi,
int serverProxyPort,
ListeningScheduledExecutorService executor) {
this.adminApi = adminApi;
this.serverProxyPort = serverProxyPort;
this.executor = executor;
this.localKeyPair = localKeyPair;
}
@VisibleForTesting
// Returns a listenable, scheduled executor that exits upon shutdown.
static ListeningScheduledExecutorService getDefaultExecutor() {
// TODO(kvg): Figure out correct way to determine number of threads
ScheduledThreadPoolExecutor executor =
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
return MoreExecutors.listeningDecorator(
MoreExecutors.getExitingScheduledExecutorService(executor));
}
/**
* Initializes a new Cloud SQL instance based on the given connection name.
*
* @param connectionName instance connection name in the format "PROJECT_ID:REGION_ID:INSTANCE_ID"
* @param apiClient Cloud SQL Admin API client for interacting with the Cloud SQL instance
* @param executor executor used to schedule asynchronous tasks
* @param keyPair public/private key pair used to authenticate connections
*/
CloudSqlInstance(
String connectionName,
SQLAdmin apiClient,
ListeningScheduledExecutorService executor,
ListenableFuture<KeyPair> keyPair) {
this.connectionName = connectionName;
Matcher matcher = CONNECTION_NAME.matcher(connectionName);
checkArgument(
matcher.matches(),
"[%s] Cloud SQL connection name is invalid, expected string in the form of"
+ " \"<PROJECT_ID>:<REGION_ID>:<INSTANCE_ID>\".");
this.projectId = matcher.group(1);
this.regionId = matcher.group(3);
this.instanceId = matcher.group(4);
this.apiClient = apiClient;
this.executor = executor;
this.keyPair = keyPair;
// Kick off initial async jobs
synchronized (instanceDataGuard) {
this.currentInstanceData = performRefresh();
this.nextInstanceData = Futures.immediateFuture(currentInstanceData);
}
}
private static <T> ListenableFuture<T> whenAllSucceed(
Callable<T> task,
ListeningScheduledExecutorService executor,
ListenableFuture<?>... futures) {
SettableFuture<T> taskFuture = SettableFuture.create();
// Create a countDown for all Futures to complete.
AtomicInteger countDown = new AtomicInteger(futures.length);
// Trigger the task when all futures are complete.
FutureCallback<Object> runWhenInputAreComplete =
new FutureCallback<Object>() {
@Override
public void onSuccess(@NullableDecl Object o) {
if (countDown.decrementAndGet() == 0) {
taskFuture.setFuture(executor.submit(task));
}
}
@Override
public void onFailure(Throwable throwable) {
if (!taskFuture.setException(throwable)) {
String msg = "Got more than one input failure. Logging failures after the first";
logger.log(Level.SEVERE, msg, throwable);
}
}
};
for (ListenableFuture<?> future : futures) {
Futures.addCallback(future, runWhenInputAreComplete, executor);
}
return taskFuture;
}
@Override
public void valueChanged(ListSelectionEvent e) {
boolean hasFocus = otrosApplication.getApplicationJFrame().isFocused();
final boolean enabled = otrosApplication.getConfiguration().getBoolean(ConfKeys.JUMP_TO_CODE_AUTO_JUMP_ENABLED, false);
if (hasFocus && enabled && !e.getValueIsAdjusting()) {
try {
final LogData logData = dataTableModel.getLogData(table.convertRowIndexToModel(e.getFirstIndex()));
Optional<Integer> line = Optional.empty();
if (StringUtils.isNotBlank(logData.getLine()) && StringUtils.isAlphanumeric(logData.getLine())) {
line = Optional.of(Integer.valueOf(logData.getLine()));
}
final LocationInfo li = new LocationInfo(
Optional.ofNullable(logData.getClazz()).orElseGet(logData::getLoggerName),
logData.getMethod(), logData.getFile(),
line,
Optional.ofNullable(logData.getMessage()));
final JumpToCodeService jumpToCodeService = otrosApplication.getServices().getJumpToCodeService();
final boolean ideAvailable = jumpToCodeService.isIdeAvailable();
if (ideAvailable) {
scheduledJump.map(input -> {
input.cancel(false);
return Boolean.TRUE;
});
ListeningScheduledExecutorService scheduledExecutorService = otrosApplication.getServices().getTaskSchedulerService().getListeningScheduledExecutorService();
delayMs = 300;
ListenableScheduledFuture<?> jump = scheduledExecutorService.schedule(
new JumpRunnable(li, jumpToCodeService), delayMs, TimeUnit.MILLISECONDS
);
scheduledJump = Optional.of(jump);
}
} catch (Exception e1) {
LOGGER.warn("Can't perform jump to code: " + e1.getMessage(), e1);
e1.printStackTrace();
}
}
}
public MDCPropagatingScheduledExecutorService(ScheduledExecutorService executorService) {
if (executorService instanceof ListeningScheduledExecutorService) {
this.executorService = (ListeningScheduledExecutorService)executorService;
} else {
this.executorService = MoreExecutors.listeningDecorator(executorService);
}
}
private static ListeningScheduledExecutorService defaultExecutorService() {
final int clientCount = clientCounter.incrementAndGet();
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("helios-client-" + clientCount + "-thread-%d")
.build();
final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(4, threadFactory);
final ScheduledExecutorService exitingExecutor =
MoreExecutors.getExitingScheduledExecutorService(stpe, 0, SECONDS);
return MoreExecutors.listeningDecorator(exitingExecutor);
}
private RetryingRequestDispatcher(final RequestDispatcher delegate,
final ListeningScheduledExecutorService executorService,
final Clock clock,
final long retryTimeoutMillis,
final long delayMillis) {
this.delegate = delegate;
this.executorService = executorService;
this.clock = clock;
this.retryTimeoutMillis = retryTimeoutMillis;
this.delayMillis = delayMillis;
}
private RemoteActionContextProvider(
CommandEnvironment env,
RemoteCache cache,
@Nullable GrpcRemoteExecutor executor,
@Nullable ListeningScheduledExecutorService retryScheduler,
DigestUtil digestUtil,
@Nullable Path logDir) {
this.env = Preconditions.checkNotNull(env, "env");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.executor = executor;
this.retryScheduler = retryScheduler;
this.digestUtil = digestUtil;
this.logDir = logDir;
}