下面列出了怎么用com.google.common.cache.RemovalListeners的API类实例代码及写法,或者点击链接到github查看源代码。
ConnectionManager(DataSourceFactory factory, Ticker ticker, long sleepIntervalNanos) {
this.datasourceClosingExecutor = Executors.newCachedThreadPool();
this.dataSources = CacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterAccess(POOL_EXPIRATION_TIMEOUT_HOURS, TimeUnit.HOURS)
.removalListener(RemovalListeners.asynchronous((RemovalListener<PoolDescriptor, HikariDataSource>) notification ->
{
HikariDataSource hds = notification.getValue();
LOG.debug("Processing cache removal of pool {} for server {} and user {} with cause {}",
hds.getPoolName(),
notification.getKey().getServer(),
notification.getKey().getUser(),
notification.getCause().toString());
// if connection pool has been removed from the cache while active query is executing
// wait until all connections finish execution and become idle, but no longer that CLEANUP_TIMEOUT
long startTime = ticker.read();
while (hds.getHikariPoolMXBean().getActiveConnections() > 0) {
if ((ticker.read() - startTime) > CLEANUP_TIMEOUT_NANOS) {
LOG.warn("Pool {} has active connections for too long, destroying it", hds.getPoolName());
break;
}
Uninterruptibles.sleepUninterruptibly(sleepIntervalNanos, TimeUnit.NANOSECONDS);
}
LOG.debug("Destroying the pool {}", hds.getPoolName());
hds.close();
}
, datasourceClosingExecutor))
.build(CacheLoader.from(key -> factory.createDataSource(key)));
}
/**
* Creates a new instance of the {@link AutoScaleProcessor} class.
*
* @param configuration The {@link AutoScalerConfig} to use as configuration.
* @param clientFactory The {@link EventStreamClientFactory} to use to bootstrap {@link EventStreamWriter} instances.
* @param executor The Executor to use for async operations.
*/
@VisibleForTesting
AutoScaleProcessor(@NonNull AutoScalerConfig configuration, EventStreamClientFactory clientFactory,
@NonNull ScheduledExecutorService executor) {
this.configuration = configuration;
this.writer = new CompletableFuture<>();
this.clientFactory = clientFactory;
this.startInitWriter = new AtomicBoolean(false);
this.cache = CacheBuilder.newBuilder()
.initialCapacity(INITIAL_CAPACITY)
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(configuration.getCacheExpiry().getSeconds(), TimeUnit.SECONDS)
.removalListener(RemovalListeners.asynchronous((RemovalListener<String, Pair<Long, Long>>) notification -> {
if (notification.getCause().equals(RemovalCause.EXPIRED)) {
triggerScaleDown(notification.getKey(), true);
}
}, executor))
.build();
// Even if there is no activity, keep cleaning up the cache so that scale down can be triggered.
// caches do not perform clean up if there is no activity. This is because they do not maintain their
// own background thread.
this.cacheCleanup = executor.scheduleAtFixedRate(cache::cleanUp, 0, configuration.getCacheCleanup().getSeconds(), TimeUnit.SECONDS);
if (clientFactory != null) {
bootstrapRequestWriters(clientFactory, executor);
}
}
/**
* Creates a new cache.
* This method is private because external access should be made through
* {@link #getSharedCache(long, CanvasDataLoader)}.
*
* @param kilobyteCapacity capacity of the cache.
* @param canvasDataLoader loader implementation for the cache.
* @param recordStats indicates whether the cache should record statistics.
*
* @throws IllegalStateException
* if any errors occur.
*/
private CanvasDataCache(final long kilobyteCapacity,
final CanvasDataLoader canvasDataLoader,
final boolean recordStats)
throws IllegalArgumentException, IllegalStateException {
if (kilobyteCapacity < 1) {
this.kilobyteCapacity = 1;
} else {
this.kilobyteCapacity = kilobyteCapacity;
}
this.weigher = (key, value) -> {
long kiloBytes = value.getKilobytes();
// hopefully we'll never have > 2000 gigabyte file,
// but if so it simply won't be fairly weighted
if (kiloBytes > Integer.MAX_VALUE) {
LOG.warn("weightOf: truncating weight for " + kiloBytes + " Kb item " + value);
kiloBytes = Integer.MAX_VALUE;
} else if (kiloBytes == 0) {
// zero weights are not supported, so we need to set empty file weight to 1
kiloBytes = 1;
}
return (int) kiloBytes;
};
// separate thread pool for removing data that expires from the cache
final ExecutorService removalService = Executors.newFixedThreadPool(4);
final RemovalListener<CanvasId, CachedCanvasData> removalListener =
removal -> {
final CachedCanvasData cachedCanvasData = removal.getValue();
if (cachedCanvasData != null) {
cachedCanvasData.remove();
}
};
this.asyncRemovalListener = RemovalListeners.asynchronous(removalListener, removalService);
this.canvasDataLoader = canvasDataLoader;
this.buildCache(recordStats);
LOG.info("<init>: exit");
}
@Activate
protected void activate(ComponentContext context) {
super.activate(context);
filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
nextCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-next", log));
RemovalListener<ObjectiveQueueKey, Objective> removalListener = notification -> {
Objective obj = notification.getValue();
switch (notification.getCause()) {
case EXPIRED:
case COLLECTED:
case SIZE:
obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.INSTALLATIONTIMEOUT));
break;
case EXPLICIT: // No action when the objective completes correctly
case REPLACED: // No action when a pending forward or next objective gets executed
default:
break;
}
};
filtObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, filtCacheEventExecutor))
.build();
fwdObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, fwdCacheEventExecutor))
.build();
nextObjQueueHead = CacheBuilder.newBuilder()
.expireAfterWrite(objTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(RemovalListeners.asynchronous(removalListener, nextCacheEventExecutor))
.build();
cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/flowobj", "cache-cleaner", log));
cacheCleaner.scheduleAtFixedRate(() -> {
filtObjQueueHead.cleanUp();
fwdObjQueueHead.cleanUp();
nextObjQueueHead.cleanUp();
}, 0, objTimeoutMs, TimeUnit.MILLISECONDS);
// Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
// execute()
flowObjectiveStore.unsetDelegate(super.delegate);
flowObjectiveStore.setDelegate(delegate);
}