下面列出了怎么用com.google.common.util.concurrent.AtomicLongMap的API类实例代码及写法,或者点击链接到github查看源代码。
private List<Predicate> getSortedPredicates() {
long[] counts = getPredicateCounts();
AtomicLongMap<Predicate> pCounts = AtomicLongMap.create();
for (int i = 0; i < counts.length; ++i) {
pCounts.put(PredicateSet.getPredicate(i), counts[i]);
}
List<Predicate> pOrder = new ArrayList<>(addablePredicates.size());
for (Predicate p : addablePredicates) {
pOrder.add(p);
}
pOrder.sort(new Comparator<Predicate>() {
@Override
public int compare(Predicate o1, Predicate o2) {
return Long.compare(pCounts.get(o2), pCounts.get(o1));
}
});
return pOrder;
}
public static EurostagFakeNodes build(Network network, EurostagEchExportConfig config) {
Objects.requireNonNull(network);
Objects.requireNonNull(config);
BiMap<String, String> fakeNodesMap = HashBiMap.create(new HashMap<>());
AtomicLongMap<String> countUsesMap = AtomicLongMap.create();
//adds 2 default fake nodes
fakeNodesMap.put(EchUtil.FAKE_NODE_NAME1, EchUtil.FAKE_NODE_NAME1);
countUsesMap.getAndIncrement(EchUtil.FAKE_NODE_NAME1);
fakeNodesMap.put(EchUtil.FAKE_NODE_NAME2, EchUtil.FAKE_NODE_NAME2);
countUsesMap.getAndIncrement(EchUtil.FAKE_NODE_NAME2);
Identifiables.sort(network.getVoltageLevels()).stream().map(VoltageLevel::getId).forEach(vlId ->
fakeNodesMap.put(vlId, newEsgId(fakeNodesMap, vlId)));
return new EurostagFakeNodes(fakeNodesMap, countUsesMap, network);
}
@Test
public void testIncrement() throws Exception {
AtomicLongMap<String> cache = AtomicLongMap.create();
cache.addAndGet("a", 1L);
cache.addAndGet("a", 2L);
cache.addAndGet("b", 5L);
Map<String, Long> remove = AtomicLongMapUtils.remove(cache);
Assert.assertEquals((long) remove.get("a"), 3L);
Assert.assertEquals((long) remove.get("b"), 5L);
cache.addAndGet("a", 1L);
Map<String, Long> remove2 = AtomicLongMapUtils.remove(cache);
Assert.assertEquals((long) remove2.get("a"), 1L);
}
private void dumpAllState(DumpData dumpData) {
AtomicLongMap<Integer> allStateMap = AtomicLongMap.create();
List<Map<Integer, Long>> allRecord = ImmutableList.of(dumpData.getBlockedTimes(), dumpData.getRunnableCpuTimes(),
dumpData.getTimedWaitingTimes(), dumpData.getWaitingTimes());
for (Map<Integer, Long> cpuTime : allRecord) {
for (Map.Entry<Integer, Long> entry : cpuTime.entrySet()) {
allStateMap.addAndGet(entry.getKey(), entry.getValue());
}
}
doDump(allStateMap.asMap(), Manager.getAllStatePath(), false);
doDump(allStateMap.asMap(), Manager.getFilterAllStatePath(), true);
}
public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
this.fastDelay = fastDelay;
this.slowDelay = slowDelay;
this.maxFastAttempts = maxFastAttempts;
failures = AtomicLongMap.create();
}
private Map<IBitSet, List<DenialConstraint>> groupDCs(DenialConstraintSet set,
ArrayList<PredicatePair> sortedPredicatePairs, IndexProvider<PartitionRefiner> indexProvider,
AtomicLongMap<PartitionRefiner> selectivityCount) {
Map<IBitSet, List<DenialConstraint>> predicateDCMap = new HashMap<>();
HashMap<PredicatePair, Integer> prios = new HashMap<>();
for (int i = 0; i < sortedPredicatePairs.size(); ++i) {
prios.put(sortedPredicatePairs.get(i), Integer.valueOf(i));
}
for (DenialConstraint dc : set) {
Set<PartitionRefiner> refinerSet = getRefinerSet(prios, dc);
predicateDCMap.computeIfAbsent(indexProvider.getBitSet(refinerSet), (Set) -> new ArrayList<>()).add(dc);
}
return predicateDCMap;
}
private AtomicLongMap<PartitionRefiner> createSelectivityEstimation(IEvidenceSet sampleEvidence,
Set<PredicatePair> predicatePairs) {
AtomicLongMap<PartitionRefiner> selectivityCount = AtomicLongMap.create();
for (PredicateBitSet ps : sampleEvidence) {
int count = (int) sampleEvidence.getCount(ps);
ps.forEach(p -> {
selectivityCount.addAndGet(p, count);
});
for (PredicatePair pair : predicatePairs)
if (pair.bothContainedIn(ps)) {
selectivityCount.addAndGet(pair, sampleEvidence.getCount(ps));
}
}
return selectivityCount;
}
public TermVectorsFilter(Fields termVectorsByField, Fields topLevelFields, Set<String> selectedFields, @Nullable AggregatedDfs dfs) {
this.fields = termVectorsByField;
this.topLevelFields = topLevelFields;
this.selectedFields = selectedFields;
this.dfs = dfs;
this.scoreTerms = new HashMap<>();
this.sizes = AtomicLongMap.create();
this.similarity = new DefaultSimilarity();
}
private void tick0() {
final Instant t0 = time.get();
final Map<String, Resource> resources;
final Optional<Long> globalConcurrency;
final StyxConfig config;
try {
config = storage.config();
globalConcurrency = config.globalConcurrency();
resources = storage.resources().stream().collect(toMap(Resource::id, identity()));
} catch (IOException e) {
log.warn("Failed to read from storage", e);
return;
}
globalConcurrency.ifPresent(
concurrency ->
resources.put(GLOBAL_RESOURCE_ID,
Resource.create(GLOBAL_RESOURCE_ID, concurrency)));
var activeInstances = stateManager.listActiveInstances();
var workflows = new ConcurrentHashMap<WorkflowId, Optional<Workflow>>();
// Note: not a strongly consistent number, so the graphed value can be imprecise or show
// exceeded limit even if the real usage never exceeded the limit.
var currentResourceUsage = AtomicLongMap.<String>create();
var currentResourceDemand = AtomicLongMap.<String>create();
processInstances(config, resources, workflows, activeInstances, currentResourceUsage, currentResourceDemand);
// TODO: stats might be inaccurate if some instances fail processing
updateResourceStats(resources, currentResourceUsage);
currentResourceDemand.asMap().forEach(stats::recordResourceDemanded);
final long durationMillis = t0.until(time.get(), ChronoUnit.MILLIS);
stats.recordTickDuration(TICK_TYPE, durationMillis);
tracer.getCurrentSpan().addAnnotation("processed",
Map.of("instances", AttributeValue.longAttributeValue(activeInstances.size())));
}
private void updateResourceStats(Map<String, Resource> resources,
AtomicLongMap<String> currentResourceUsage) {
resources.values().forEach(r -> stats.recordResourceConfigured(r.id(), r.concurrency()));
currentResourceUsage.asMap().forEach(stats::recordResourceUsed);
Sets.difference(resources.keySet(), currentResourceUsage.asMap().keySet())
.forEach(r -> stats.recordResourceUsed(r, 0));
}
@Disabled
@Test
void testNoLock() {
boolean[] success = { false };
ThrowableConsumer<AtomicLongMap<String>, Throwable> consumer = it -> {
assertEquals(1000, it.get("s1"));
assertEquals(100, it.get("s2"));
success[0] = true;
};
BufferTrigger<String> bufferTrigger = BufferTrigger.<String, AtomicLongMap<String>> simple()
.setContainer(AtomicLongMap::create, (c, e) -> {
c.incrementAndGet(e);
return true;
})
.disableSwitchLock()
.interval(1, SECONDS)
.consumer(consumer)
.build();
for (int i = 0; i < 1000; i++) {
bufferTrigger.enqueue("s1");
}
for (int i = 0; i < 100; i++) {
bufferTrigger.enqueue("s2");
}
bufferTrigger.manuallyDoTrigger();
assertTrue(success[0]);
}
public static <T> Map<T, Long> remove(AtomicLongMap<T> atomicLongMap) {
final Map<T, Long> view = atomicLongMap.asMap();
// view.size() is not recommended, cache entry is striped and volatile field
final List<T> keySnapshot = keySnapshot(view);
return remove(atomicLongMap, keySnapshot);
}
@Test
public void testIntegerMax() throws Exception {
AtomicLongMap<String> cache = AtomicLongMap.create();
cache.addAndGet("a", 1L);
cache.addAndGet("a", 2L);
cache.addAndGet("b", 5L);
}
@Test
public void testIntegerMin() throws Exception {
AtomicLongMap<String> cache = AtomicLongMap.create();
cache.addAndGet("a", 1L);
cache.addAndGet("a", 2L);
cache.addAndGet("b", 5L);
}
/**
* 以Guava的AtomicLongMap,实现线程安全的HashMap<E,AtomicLong>结构的Counter
*/
public static <E> AtomicLongMap<E> createConcurrentCounterMap() {
return AtomicLongMap.create();
}
/**
* 以Guava的AtomicLongMap,实现线程安全的HashMap<E,AtomicLong>结构的Counter
*/
public static <E> AtomicLongMap<E> createConcurrentCounterMap() {
return AtomicLongMap.create();
}
public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
failures = AtomicLongMap.create();
}
public HashEvidenceSet complete(DenialConstraintSet set, IEvidenceSet sampleEvidence, IEvidenceSet fullEvidence) {
log.info("Checking " + set.size() + " DCs.");
log.info("Building selectivity estimation");
// frequency estimation predicate pairs
Multiset<PredicatePair> paircountDC = frequencyEstimationForPredicatePairs(set);
// selectivity estimation for predicates & predicate pairs
AtomicLongMap<PartitionRefiner> selectivityCount = createSelectivityEstimation(sampleEvidence,
paircountDC.elementSet());
ArrayList<PredicatePair> sortedPredicatePairs = getSortedPredicatePairs(paircountDC, selectivityCount);
IndexProvider<PartitionRefiner> indexProvider = new IndexProvider<>();
log.info("Grouping DCs..");
Map<IBitSet, List<DenialConstraint>> predicateDCMap = groupDCs(set, sortedPredicatePairs, indexProvider,
selectivityCount);
int[] refinerPriorities = getRefinerPriorities(selectivityCount, indexProvider, predicateDCMap);
SuperSetWalker walker = new SuperSetWalker(predicateDCMap.keySet(), refinerPriorities);
log.info("Calculating partitions..");
HashEvidenceSet resultEv = new HashEvidenceSet();
for (PredicateBitSet i : fullEvidence)
resultEv.add(i);
ClusterPair startPartition = StrippedPartition.getFullParition(input.getLineCount());
int[][] values = input.getInts();
IEJoin iejoin = new IEJoin(values);
PartitionEvidenceSetBuilder builder = new PartitionEvidenceSetBuilder(predicates, values);
long startTime = System.nanoTime();
walker.walk((inter) -> {
if((System.nanoTime() - startTime) >TimeUnit.MINUTES.toNanos(120))
return;
Consumer<ClusterPair> consumer = (clusterPair) -> {
List<DenialConstraint> currentDCs = predicateDCMap.get(inter.currentBits);
if (currentDCs != null) {
// EtmPoint point = etmMonitor.createPoint("EVIDENCES");
builder.addEvidences(clusterPair, resultEv);
// point.collect();
} else {
inter.nextRefiner.accept(clusterPair);
}
};
PartitionRefiner refiner = indexProvider.getObject(inter.newRefiner);
// System.out.println(refiner);
ClusterPair partition = inter.clusterPair != null ? inter.clusterPair : startPartition;
partition.refine(refiner, iejoin, consumer);
});
return resultEv;
// return output;
}
public AtomicLongMap<PredicateSet> getEvidences() {
return evidences;
}
public void setEvidences(AtomicLongMap<PredicateSet> evidences) {
this.evidences = evidences;
}
/**
* 以Guava的AtomicLongMap,实现线程安全的HashMap<E,AtomicLong>结构的Counter
*/
public static <E> AtomicLongMap<E> createConcurrentMapCounter() {
return AtomicLongMap.create();
}
private EurostagFakeNodes(Map<String, String> fakeNodesMap, AtomicLongMap<String> countUsesMap, Network network) {
this.network = network;
this.fakeNodesMap = HashBiMap.create(fakeNodesMap);
this.countUsesMap = countUsesMap;
}
private void processInstance(StyxConfig config, Map<String, Resource> resources,
ConcurrentMap<WorkflowId, Optional<Workflow>> workflows, WorkflowInstance instance,
ConcurrentMap<String, Boolean> resourceExhaustedCache,
AtomicLongMap<String> currentResourceUsage,
AtomicLongMap<String> currentResourceDemand) {
log.debug("Processing instance: {}", instance);
// Get the run state or exit if it does not exist
var runStateOpt = stateManager.getActiveState(instance);
if (runStateOpt.isEmpty()) {
return;
}
var runState = runStateOpt.orElseThrow();
// Look up the resources that are used by this workflow
// Account current resource usage
if (StateUtil.isConsumingResources(runState.state())) {
runState.data().resourceIds().ifPresent(ids -> ids.forEach(currentResourceUsage::incrementAndGet));
}
// Exit if this instance is not eligible for dequeue
if (!shouldExecute(runState)) {
return;
}
log.debug("Evaluating instance for dequeue: {}", instance);
if (!config.globalEnabled()) {
LOG.debug("Scheduling disabled, sending instance back to queue: {}", instance);
stateManager.receiveIgnoreClosed(Event.retryAfter(instance, randomizedDelay()), runState.counter());
return;
}
// Get the workflow configuration
var workflowOpt = workflows.computeIfAbsent(instance.workflowId(), this::readWorkflow);
var workflowConfig = workflowOpt
.map(Workflow::configuration)
// Dummy placeholder
.orElse(WorkflowConfiguration
.builder()
.id(instance.workflowId().id())
.schedule(Schedule.parse(""))
.build());
var workflowResourceRefs = workflowResources(config.globalConcurrency().isPresent(), workflowOpt);
var instanceResourceRefs = resourceDecorator.decorateResources(
runState, workflowConfig, workflowResourceRefs);
var unknownResources = instanceResourceRefs.stream()
.filter(resourceRef -> !resources.containsKey(resourceRef))
.collect(toSet());
if (!unknownResources.isEmpty()) {
var error = Event.runError(instance, "Referenced resources not found: " + unknownResources);
stateManager.receiveIgnoreClosed(error, runState.counter());
return;
}
// Account resource demand by instances that are queued
instanceResourceRefs.forEach(currentResourceDemand::incrementAndGet);
// Check resource limits. This is racy and can give false positives but the transactional
// checking happens later. This is just intended to avoid spinning on exhausted resources.
final List<String> depletedResources = instanceResourceRefs.stream()
.filter(resourceId -> limitReached(resourceId, resourceExhaustedCache))
.sorted()
.collect(toList());
if (!depletedResources.isEmpty()) {
log.debug("Resource limit reached for instance, not dequeueing: {}: exhausted resources={}",
instance, depletedResources);
MessageUtil.emitResourceLimitReachedMessage(stateManager, runState, depletedResources);
return;
}
// Racy: some resources may have been removed (become unknown) by now; in that case the
// counters code during dequeue will treat them as unlimited...
sendDequeue(instance, runState, instanceResourceRefs);
}
@Override
public long getTotalErrorCount() {
AtomicLongMap<String> atomicLongMap = ErrorStatisticsAppender.ERROR_NAME_VALUE_MAP;
return atomicLongMap.sum();
}
public void testRemove_thread_safety() throws InterruptedException {
final AtomicLongMap<String> cache = AtomicLongMap.create();
final int totalThread = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(totalThread);
final AtomicLong totalCounter = new AtomicLong();
final AtomicBoolean writerThread = new AtomicBoolean(true);
final AtomicBoolean removeThread = new AtomicBoolean(true);
final CountDownLatch writerLatch = new CountDownLatch(totalThread);
for (int i = 0; i < totalThread; i++) {
final int writerName = i;
executorService.execute(new Runnable() {
@Override
public void run() {
while (writerThread.get()) {
cache.incrementAndGet("aa");
cache.incrementAndGet("cc");
cache.incrementAndGet("aa");
cache.incrementAndGet("bb");
cache.incrementAndGet("bb");
cache.incrementAndGet("bb");
cache.incrementAndGet("cc");
cache.incrementAndGet("d");
totalCounter.addAndGet(8);
}
writerLatch.countDown();
logger.debug("shutdown {}", writerName);
}
});
}
final AtomicLong sumCounter = new AtomicLong();
executorService.execute(new Runnable() {
@Override
public void run() {
while (removeThread.get()) {
Map<String, Long> remove = AtomicLongMapUtils.remove(cache);
sumCounter.addAndGet(sum(remove));
logger.debug("sum:{}", remove);
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
}
}
});
Uninterruptibles.sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
writerThread.set(false);
writerLatch.await();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
removeThread.set(false);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
executorService.shutdown();
logger.debug("total={} sum:{}", totalCounter.get(), sumCounter.get());
Assert.assertEquals("concurrent remove and increment", totalCounter.get(), sumCounter.get());
}
private TestAtomicCounterMap(String name) {
// Init name, map using create
atomicCounterMapName = name;
map = AtomicLongMap.create();
}
public AtomicLongMapTutorials() {
atomicLongMap = AtomicLongMap.create();
}