下面列出了怎么用com.google.common.cache.RemovalCause的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("FutureReturnValueIgnored")
ShardWriterCache() {
this.cache =
CacheBuilder.newBuilder()
.expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.<Integer, ShardWriter<K, V>>removalListener(
notification -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
ShardWriter writer = notification.getValue();
LOG.info(
"{} : Closing idle shard writer {} after 1 minute of idle time.",
writer.shard,
writer.producerName);
writer.producer.close();
}
})
.build();
// run cache.cleanUp() every 10 seconds.
SCHEDULED_CLEAN_UP_THREAD.scheduleAtFixedRate(
cache::cleanUp,
CLEAN_UP_CHECK_INTERVAL_MS,
CLEAN_UP_CHECK_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
public VertexCache(long maxCacheSize, int concurrencyLevel, int initialDirtySize) {
volatileVertices = new ConcurrentHashMap<>(initialDirtySize);
cache = CacheBuilder.newBuilder()
.maximumSize(maxCacheSize)
.concurrencyLevel(concurrencyLevel)
.removalListener((RemovalListener<Long, InternalVertex>) notification -> {
if (notification.getCause() == RemovalCause.EXPLICIT) { //Due to invalidation at the end
return;
}
//We get here if the entry is evicted because of size constraint or replaced through add
//i.e. RemovalCause.SIZE or RemovalCause.REPLACED
InternalVertex v = notification.getValue();
if (((AbstractVertex) v).isTxOpen() && (v.isModified() || v.isRemoved())) { //move vertex to volatile map if we cannot lose track of it
volatileVertices.putIfAbsent(notification.getKey(), v);
}
})
.build();
}
@Activate
public void activate() {
providerService = providerRegistry.register(this);
pendingOperations = CacheBuilder.newBuilder()
.expireAfterWrite(TIMEOUT, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Long, MeterOperation> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
log.debug("Expired on meter provider. Meter key {} and operation {}",
notification.getKey(), notification.getValue());
providerService.meterOperationFailed(notification.getValue(),
MeterFailReason.TIMEOUT);
}
}).build();
controller.addEventListener(listener);
controller.addListener(listener);
controller.getSwitches().forEach((sw -> createStatsCollection(sw)));
}
@Override
public void onRemoval(RemovalNotification<String, CachedOffer> notification) {
if (notification.getCause() == RemovalCause.EXPLICIT) {
return;
}
LOG.debug(
"Cache removal for {} due to {}",
notification.getKey(),
notification.getCause()
);
synchronized (offerCache) {
if (notification.getValue().offerState == OfferState.AVAILABLE) {
declineOffer(notification.getValue());
} else {
notification.getValue().expire();
}
}
}
@Test
public void whenEntryRemovedFromCache_thenNotify() {
final CacheLoader<String, String> loader = new CacheLoader<String, String>() {
@Override
public final String load(final String key) {
return key.toUpperCase();
}
};
final RemovalListener<String, String> listener = new RemovalListener<String, String>() {
@Override
public void onRemoval(final RemovalNotification<String, String> n) {
if (n.wasEvicted()) {
final String cause = n.getCause().name();
assertEquals(RemovalCause.SIZE.toString(), cause);
}
}
};
final LoadingCache<String, String> cache = CacheBuilder.newBuilder().maximumSize(3).removalListener(listener).build(loader);
cache.getUnchecked("first");
cache.getUnchecked("second");
cache.getUnchecked("third");
cache.getUnchecked("last");
assertEquals(3, cache.size());
}
public void startServer(int port, int completionSourceTimeout) {
netServer = vertx.createNetServer();
netServer.connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited(SysProperties.DELIMITED, sock);
parser.endHandler(v -> sock.close()).exceptionHandler(t -> {
logger.error("Failed to start NetServer", t);
sock.close();
}).handler(buffer -> {
RemoteReply name = buffer.toJsonObject().mapTo(RemoteReply.class);
processRequestInternal(name);
});
});
bindAddress = new InetSocketAddress(port);
netServer.listen(port);
commandTaskDict = CacheBuilder.newBuilder().removalListener((RemovalListener<String, CommandTaskCompletionSource>) notification -> {
if (notification.getCause().equals(RemovalCause.EXPIRED)) {
processTimeoutCommand(notification.getKey(), notification.getValue());
}
}).expireAfterWrite(completionSourceTimeout, TimeUnit.MILLISECONDS).build();
commandExecutedMessageLocalQueue = new LinkedBlockingQueue<>();
domainEventHandledMessageLocalQueue = new LinkedBlockingQueue<>();
commandExecutedMessageWorker = new Worker("ProcessExecutedCommandMessage", () -> {
processExecutedCommandMessage(commandExecutedMessageLocalQueue.take());
});
domainEventHandledMessageWorker = new Worker("ProcessDomainEventHandledMessage", () -> {
processDomainEventHandledMessage(domainEventHandledMessageLocalQueue.take());
});
}
PDFSProtocol(NodeEndpoint endpoint, SabotConfig config, BufferAllocator allocator, FileSystem localFS,
boolean allowLocalAccess, Ticker ticker) {
this.endpoint = endpoint;
this.allocator = allocator;
this.localFS = localFS;
this.allowLocalAccess = allowLocalAccess;
this.rpcTimeoutInSecs = config.getInt(RpcConstants.BIT_RPC_TIMEOUT);
long openIteratorsTimeoutMs = config.getMilliseconds(OPEN_ITERATORS_TIMEOUT_MS_KEY);
this.openIterators = CacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterAccess(openIteratorsTimeoutMs, TimeUnit.MILLISECONDS)
.removalListener(
(RemovalNotification<ListStatusContinuationHandle, RemoteIterator<FileStatus>> notification) -> {
if (notification.getCause() == RemovalCause.EXPLICIT) {
return;
}
logger.info("Iterator for handle {} expired (cause: {})", notification.getKey(), notification.getCause());
RemoteIterator<FileStatus> iterator = notification.getValue();
if (iterator instanceof Closeable) {
try {
((Closeable) iterator).close();
} catch (IOException e) {
// swallow exception
logger.warn("Exception thrown when closing iterator for handle {}", notification.getKey(), e);
}
}
})
.build();
}
public void closeAndCleanup(RemovalCause cause, Cache cache)
throws IOException
{
if (cause != RemovalCause.REPLACED) {
log.warn("Evicting " + getRemotePath().toString() + " due to " + cause);
deleteFiles(cache);
}
}
@Override
public void onRemoval(final RemovalNotification<String, List<MemberDescriptor>> notification) {
final RemovalCause cause = notification.getCause();
if (cause.equals(RemovalCause.EXPLICIT)) {
final String key = notification.getKey();
boolean b = ProjectDatabaseHelper.deleteMemberDescriptors(key);
}
}
@Override
public void onRemoval(final RemovalNotification<File, Source> notification) {
final RemovalCause cause = notification.getCause();
final Config config = Config.load();
if (config.useSourceCache() && cause.equals(RemovalCause.EXPLICIT)) {
final Source source = notification.getValue();
try {
deleteSource(source);
} catch (Exception e) {
log.catching(e);
}
}
}
public void init()
{
hosts = CacheBuilder.newBuilder().
expireAfterAccess( HOST_EXPIRATION_SEC, TimeUnit.SECONDS ).
removalListener( new RemovalListener<String, ResourceHostInfo>()
{
@Override
public void onRemoval(
final RemovalNotification<String, ResourceHostInfo> notification )
{
if ( notification.getCause() == RemovalCause.EXPIRED )
{
for ( HostListener listener : hostListeners )
{
threadPool.execute(
new HostNotifier( listener, notification.getValue() ) );
}
}
}
} ).
build();
hostUpdater.scheduleWithFixedDelay( new Runnable()
{
@Override
public void run()
{
updateHosts();
}
}, HOST_UPDATER_INTERVAL_SEC, HOST_UPDATER_INTERVAL_SEC, TimeUnit.SECONDS );
}
/**
* Creates a new instance of the {@link AutoScaleProcessor} class.
*
* @param configuration The {@link AutoScalerConfig} to use as configuration.
* @param clientFactory The {@link EventStreamClientFactory} to use to bootstrap {@link EventStreamWriter} instances.
* @param executor The Executor to use for async operations.
*/
@VisibleForTesting
AutoScaleProcessor(@NonNull AutoScalerConfig configuration, EventStreamClientFactory clientFactory,
@NonNull ScheduledExecutorService executor) {
this.configuration = configuration;
this.writer = new CompletableFuture<>();
this.clientFactory = clientFactory;
this.startInitWriter = new AtomicBoolean(false);
this.cache = CacheBuilder.newBuilder()
.initialCapacity(INITIAL_CAPACITY)
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(configuration.getCacheExpiry().getSeconds(), TimeUnit.SECONDS)
.removalListener(RemovalListeners.asynchronous((RemovalListener<String, Pair<Long, Long>>) notification -> {
if (notification.getCause().equals(RemovalCause.EXPIRED)) {
triggerScaleDown(notification.getKey(), true);
}
}, executor))
.build();
// Even if there is no activity, keep cleaning up the cache so that scale down can be triggered.
// caches do not perform clean up if there is no activity. This is because they do not maintain their
// own background thread.
this.cacheCleanup = executor.scheduleAtFixedRate(cache::cleanUp, 0, configuration.getCacheCleanup().getSeconds(), TimeUnit.SECONDS);
if (clientFactory != null) {
bootstrapRequestWriters(clientFactory, executor);
}
}
private void handleCacheRemove(RemovalNotification<Long, UnconfirmedMessage> notification) {
if (notification.getCause().equals(RemovalCause.EXPIRED)) {
UnconfirmedMessage message = notification.getValue();
if (message != null) { //TODO figure out why this can be null??
ackWorker.schedule(() -> {
if (message.published) {
log.warnWithParams("Message did not receive publish-confirm in time", "messageId", message.props.getMessageId());
}
message.nack(new TimeoutException("Message did not receive publish confirm in time"));
});
}
}
}
private synchronized void buildCache() {
Map<String, ManagementNodeNotFoundHandler> oldEntries = null;
if (managementNodeNotFoundHandlers != null) {
oldEntries = managementNodeNotFoundHandlers.asMap();
}
long maxNum = CloudBusGlobalConfig.MAX_MANAGEMENTNODE_NOTFOUND_ERROR_HANDLER_NUM.value(Long.class);
long timeout = CloudBusGlobalConfig.MAX_MANAGEMENTNODE_NOTFOUND_ERROR_HANDLER_TIMEOUT.value(Long.class);
managementNodeNotFoundHandlers = CacheBuilder.newBuilder()
.maximumSize(maxNum)
.expireAfterWrite(timeout, TimeUnit.SECONDS)
.removalListener((RemovalListener<String, ManagementNodeNotFoundHandler>) removalNotification -> {
if (removalNotification.getCause() == RemovalCause.SIZE || removalNotification.getCause() == RemovalCause.EXPIRED || removalNotification.getCause() == RemovalCause.COLLECTED) {
ManagementNodeNotFoundHandler handler = removalNotification.getValue();
logger.warn(String.format("A message failing to send to the management node[uuid:%s] because the node is offline while the message being sent. Now the message is being dropped " +
"because the cache policy[%s] requires and the management node is not online til now. The message dump:\n %s", handler.managementNodeUuid,
removalNotification.getCause(), CloudBusGson.toJson(handler.message)));
}
}).build();
if (oldEntries != null) {
oldEntries.forEach((k, v) -> managementNodeNotFoundHandlers.put(k, v));
}
logger.debug(String.format("build cache of ManagementNodeNotFoundHandler[maxNum:%s, timeout: %ss, current entries: %s]", maxNum, timeout, managementNodeNotFoundHandlers.size()));
}
/**
* Callback method for removal of items from the histories cache. Items removed from the cache need to be acked or failed
* according to the reason they were removed
*/
@Override
public void onRemoval(RemovalNotification<CVParticle, String> notification) {
// make sure the CVParticle object is removed from the history (even if removal was automatic!)
history.clear(notification.getKey(), notification.getValue());
if(notification.getCause() == RemovalCause.EXPIRED || notification.getCause() == RemovalCause.SIZE){
// item removed automatically --> fail the tuple
collector.fail(notification.getKey().getTuple());
}else{
// item removed explicitly --> ack the tuple
collector.ack(notification.getKey().getTuple());
}
}
@Override
public void onRemoval(RemovalNotification<GuidePostsKey, GuidePostsInfo> notification) {
if (logger.isTraceEnabled()) {
final RemovalCause cause = notification.getCause();
if (wasEvicted(cause)) {
GuidePostsKey key = notification.getKey();
logger.trace("Cached stats for {} with size={}bytes was evicted due to cause={}",
new Object[] {key, notification.getValue().getEstimatedSize(),
cause});
}
}
}
@Test
public void nonEvictionsAreIgnored() {
// We don't care so much about cases where we trigger a removal or update of the stats
// for a table in the cache, but we would want to know about updates happening automatically
PhoenixStatsCacheRemovalListener listener = new PhoenixStatsCacheRemovalListener();
// User-driven removals or updates
assertFalse(listener.wasEvicted(RemovalCause.EXPLICIT));
assertFalse(listener.wasEvicted(RemovalCause.REPLACED));
// Automatic removals by the cache itself (per configuration)
assertTrue(listener.wasEvicted(RemovalCause.COLLECTED));
assertTrue(listener.wasEvicted(RemovalCause.EXPIRED));
assertTrue(listener.wasEvicted(RemovalCause.SIZE));
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
log.debug("Initiate OLT pipeline");
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
flowRuleService = serviceDirectory.get(FlowRuleService.class);
coreService = serviceDirectory.get(CoreService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
storageService = serviceDirectory.get(StorageService.class);
appId = coreService.registerApplication(
"org.onosproject.driver.OLTPipeline");
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
groupService.addListener(new InnerGroupListener());
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
coreService = serviceDirectory.get(CoreService.class);
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
groupService.addListener(new InnerGroupListener());
appId = coreService.registerApplication(
"org.onosproject.driver.CentecV350Pipeline");
initializePipeline();
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
pendingGroups = CacheBuilder
.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(),
ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
coreService = serviceDirectory.get(CoreService.class);
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
groupService.addListener(new InnerGroupListener());
appId = coreService
.registerApplication("org.onosproject.driver.SpringOpenTTP");
setTableMissEntries();
log.info("Spring Open TTP driver initialized");
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
log.debug("Initiate OLT pipeline");
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
flowRuleService = serviceDirectory.get(FlowRuleService.class);
coreService = serviceDirectory.get(CoreService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
storageService = serviceDirectory.get(StorageService.class);
appId = coreService.registerApplication(
"org.onosproject.driver.OLTPipeline");
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
groupService.addListener(new InnerGroupListener());
}
private Cache<Long, InternalCacheEntry> createBatchCache() {
return CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
providerService.batchOperationCompleted(notification.getKey(),
notification.getValue().failedCompletion());
}
}).build();
}
@Activate
public void activate() {
providerRegistryService.registerProvider(this);
internalMeterListener = new InternalMeterListener();
idGenerator = getIdGenerator();
pendingOperations = CacheBuilder.newBuilder()
.expireAfterWrite(TIMEOUT, TimeUnit.SECONDS)
.removalListener(
(RemovalNotification<Long, VirtualMeterOperation>
notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
NetworkId networkId = notification.getValue().networkId();
MeterOperation op = notification.getValue().operation();
VirtualMeterProviderService providerService =
(VirtualMeterProviderService) providerRegistryService
.getProviderService(networkId,
VirtualMeterProvider.class);
providerService.meterOperationFailed(op,
MeterFailReason.TIMEOUT);
}
}).build();
meterService.addListener(internalMeterListener);
log.info("Started");
}
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.deviceId = deviceId;
pendingNext = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
notification.getValue().context()
.ifPresent(c -> c.onError(notification.getValue(),
ObjectiveError.FLOWINSTALLATIONFAILED));
}
}).build();
}
private void onRemoved(RemovalNotification notification) {
if(notification.getCause() != RemovalCause.EXPLICIT) {
logger.info("voice context for {} removed from cache because {}", notification.getKey(), notification.getCause());
}
}
private void onRemovedFromCache(
final RemovalNotification<UnsignedLong, BeaconState> removalNotification) {
if (removalNotification.getCause() != RemovalCause.REPLACED) {
availableSlots.remove(removalNotification.getKey());
}
}
@Override
public void onRemoval(RemovalNotification<State, Set<KeyAndSource>> notification) {
Preconditions.checkState(RemovalCause.COLLECTED.equals(notification.getCause()));
cleanUpForCollectedState(notification.getValue());
}
boolean wasEvicted(RemovalCause cause) {
// This is actually a method on RemovalCause but isn't exposed
return RemovalCause.EXPLICIT != cause && RemovalCause.REPLACED != cause;
}