下面列出了java.util.concurrent.atomic.LongAdder#sum() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Measures the occurrence of events in relation to the time having passed by since the last measurement. This is at minimum the value of
* {@link #MEASUREMENT_INTERVAL}.
*
* @param eventCount A counter for the events having occurred so far within the current time-interval
* @param lastMeasurementTime The point in time when the last measurement was done This reference will be updated in case the
* time-interval was exceeded
* @return The new current value for the dimension. If the time-interval was exceeded this is a newly calculated value otherwise the
* return value is -1.
*/
protected final double measureDimension(LongAdder eventCount, AtomicLong lastMeasurementTime) {
long now = Service.currentTimeMillis();
long last = lastMeasurementTime.get();
if (now - last > MEASUREMENT_INTERVAL) {
//Only if this thread was the one setting the new measurement timestamp it may be the one resetting the event counter
if (lastMeasurementTime.compareAndSet(last, now)) {
long evtSum = eventCount.sum();
//"Reset" the adder by subtracting the current evtSum (We can't use #reset() as this isn't thread safe)
eventCount.add(-evtSum);
//Calculate the new dimension value
return (double) evtSum / ((double) (now - last) / 1000d);
}
}
return -1;
}
/**
* adds by multiple threads produce correct sum
*/
public void testAddAndSumMT() throws Throwable {
final int incs = 1000000;
final int nthreads = 4;
final ExecutorService pool = Executors.newCachedThreadPool();
LongAdder a = new LongAdder();
CyclicBarrier barrier = new CyclicBarrier(nthreads + 1);
for (int i = 0; i < nthreads; ++i)
pool.execute(new AdderTask(a, barrier, incs));
barrier.await();
barrier.await();
long total = (long)nthreads * incs;
long sum = a.sum();
assertEquals(sum, total);
pool.shutdown();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
throws InterruptedException {
long curt = System.currentTimeMillis();
long endt = curt + timems;
while (curt < endt) {
if (ctr.sum() == oldval) {
Thread.sleep(100);
curt = System.currentTimeMillis();
} else {
assertEquals(newval, ctr.sum());
return;
}
}
fail();
}
public void run() {
try {
barrier.await();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.add(1L);
result = a.sum();
barrier.await();
} catch (Throwable t) { throw new Error(t); }
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
try {
barrier.await();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.add(1L);
result = a.sum();
barrier.await();
} catch (Throwable t) { throw new Error(t); }
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
public long getTotalMqttCounter(boolean reset) {
LongAdder longAdder = specificCounters[MQTT_STAT_COUNTER_INDEX];
return reset ? longAdder.sumThenReset() : longAdder.sum();
}
@Test(timeout = 300_000)
public void testFlowControl() throws Exception {
final AtomicInteger errors = new AtomicInteger(0);
final String queueName = getQueueName();
JmsConnectionFactory connectionFactory =
new JmsConnectionFactory(
"amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&amqp.drainTimeout=1000");
LongAdder sendCount = new LongAdder();
LongAdder consumeCount = new LongAdder();
Thread consumerThread =
new Thread(
() -> {
try (JMSContext listenerContext =
connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
try (JMSConsumer consumer =
listenerContext.createConsumer(
listenerContext.createQueue(queueName))) {
while (!Thread.interrupted()) {
while (true) {
if (consumer.receiveNoWait() == null) {
break;
}
consumeCount.increment();
if (consumeCount.sum() == NUMBER_OF_MESSAGES) {
return;
}
}
}
}
} catch (Exception e) {
e.printStackTrace(System.out);
errors.incrementAndGet();
}
});
consumerThread.start();
try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
final Message message = context.createMessage();
message.setStringProperty("selector", "dude");
JMSProducer producer = context.createProducer();
Queue queue = context.createQueue(queueName);
while (sendCount.sum() < NUMBER_OF_MESSAGES && !Thread.interrupted()) {
producer.send(queue, message);
sendCount.increment();
long sent = sendCount.sum();
}
}
consumerThread.join();
Assert.assertEquals(0, errors.get());
}
@Override
protected ShardRequest doRetrieveStatsRequest(ResponseBuilder rb) {
// check approximately what terms are needed.
// NOTE: query rewrite only expands to terms that are present in the local index
// so it's possible that the result will contain less terms than present in all shards.
// HOWEVER: the absence of these terms is recorded by LRUStatsSource, and they will be
// force-fetched on next request and cached.
// check for missing stats from previous requests
if (!missingColStats.isEmpty() || !missingColStats.isEmpty()) {
// needs to fetch anyway, so get the full query stats + the missing stats for caching
ShardRequest sreq = super.doRetrieveStatsRequest(rb);
if (!missingColStats.isEmpty()) {
Set<String> requestColStats = missingColStats;
// there's a small window when new items may be added before
// creating the request and clearing, so don't clear - instead replace the instance
missingColStats = ConcurrentHashMap.newKeySet();
sreq.params.add(FIELDS_KEY, StatsUtil.fieldsToString(requestColStats));
}
if (!missingTermStats.isEmpty()) {
Set<Term> requestTermStats = missingTermStats;
missingTermStats = ConcurrentHashMap.newKeySet();
sreq.params.add(TERMS_KEY, StatsUtil.termsToEncodedString(requestTermStats));
}
return sreq;
}
// rewrite locally to see if there are any missing terms. See the note above for caveats.
LongAdder missing = new LongAdder();
try {
// use ignorableMetrics to avoid counting this checking as real misses
approxCheckMissingStats(rb, new LRUStatsSource(ignorableMetrics), t -> missing.increment(), f -> missing.increment());
if (missing.sum() == 0) {
// it should be (approximately) ok to skip the fetching
// since we already incremented the stats decrement it here
statsCacheMetrics.retrieveStats.decrement();
statsCacheMetrics.useCachedGlobalStats.increment();
return null;
} else {
return super.doRetrieveStatsRequest(rb);
}
} catch (IOException e) {
log.warn("Exception checking missing stats for query {}, forcing retrieving stats", rb.getQuery(), e);
// retrieve anyway
return super.doRetrieveStatsRequest(rb);
}
}
public long getTotalAppCounter(boolean reset) {
LongAdder longAdder = specificCounters[APP_STAT_COUNTER_INDEX];
return reset ? longAdder.sumThenReset() : longAdder.sum();
}
public Stat(SessionDao sessionDao, UserDao userDao, BlockingIOProcessor blockingIOProcessor,
GlobalStats globalStats, ReportScheduler reportScheduler, boolean reset) {
//yeap, some stats updates may be lost (because of sumThenReset()),
//but we don't care, cause this is just for general monitoring
for (Short command : Command.VALUES_NAME.keySet()) {
LongAdder longAdder = globalStats.specificCounters[command];
int val = (int) (reset ? longAdder.sumThenReset() : longAdder.sum());
this.http.assign(command, val);
this.commands.assign(command, val);
}
this.commands.appTotal = (int) globalStats.getTotalAppCounter(reset);
this.commands.mqttTotal = (int) globalStats.getTotalMqttCounter(reset);
this.oneMinRate = (int) globalStats.totalMessages.getOneMinuteRate();
int connectedSessions = 0;
int hardActive = 0;
int totalOnlineHards = 0;
int appActive = 0;
int totalOnlineApps = 0;
int active = 0;
int activeWeek = 0;
int activeMonth = 0;
this.ts = System.currentTimeMillis();
for (Map.Entry<UserKey, Session> entry: sessionDao.userSession.entrySet()) {
Session session = entry.getValue();
if (session.isHardwareConnected() && session.isAppConnected()) {
connectedSessions++;
}
if (session.isHardwareConnected()) {
hardActive++;
totalOnlineHards += session.hardwareChannels.size();
}
if (session.isAppConnected()) {
appActive++;
totalOnlineApps += session.appChannels.size();
}
UserKey userKey = entry.getKey();
User user = userDao.users.get(userKey);
if (user != null) {
if (this.ts - user.lastModifiedTs < ONE_DAY || dashUpdated(user, this.ts, ONE_DAY)) {
active++;
activeWeek++;
activeMonth++;
continue;
}
if (this.ts - user.lastModifiedTs < ONE_WEEK || dashUpdated(user, this.ts, ONE_WEEK)) {
activeWeek++;
activeMonth++;
continue;
}
if (this.ts - user.lastModifiedTs < ONE_MONTH || dashUpdated(user, this.ts, ONE_MONTH)) {
activeMonth++;
}
}
}
this.connected = connectedSessions;
this.onlineApps = appActive;
this.totalOnlineApps = totalOnlineApps;
this.onlineHards = hardActive;
this.totalOnlineHards = totalOnlineHards;
this.active = active;
this.activeWeek = activeWeek;
this.activeMonth = activeMonth;
this.registrations = userDao.users.size();
this.ioStat = new BlockingIOStat(blockingIOProcessor, reportScheduler);
this.memoryStat = new MemoryStat(ByteBufAllocator.DEFAULT);
}