下面列出了怎么用java.util.concurrent.atomic.LongAccumulator的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws InterruptedException {
LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
Thread[] ts = new Thread[1000];
for(int i=0;i<1000;i++){
ts[i] = new Thread(()->{
Random random = new Random();
long value = random.nextLong();
accumulator.accumulate(value);
});
ts[i].start();
}
for(int i=0;i<1000;i++){
ts[i].join();
}
System.out.println(accumulator.longValue());
}
public static void main(String[] args) throws Exception{
LongAccumulator accumulator=new LongAccumulator(Long::max,Long.MIN_VALUE);
Thread[] ts=new Thread[100];
for(int i=0;i<100;i++){
ts[i]=new Thread(()->{
Random random=new Random();
long value=random.nextLong();
accumulator.accumulate(value);
});
ts[i].start();
}
for(int i=0;i<100;i++){
ts[i].join();
}
System.out.println(accumulator.longValue());
}
/**
* accumulates by multiple threads produce correct result
*/
public void testAccumulateAndGetMT() {
final int incs = 1000000;
final int nthreads = 4;
final ExecutorService pool = Executors.newCachedThreadPool();
LongAccumulator a = new LongAccumulator(Long::max, 0L);
Phaser phaser = new Phaser(nthreads + 1);
for (int i = 0; i < nthreads; ++i)
pool.execute(new AccTask(a, phaser, incs));
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
long expected = incs - 1;
long result = a.get();
assertEquals(expected, result);
pool.shutdown();
}
public static void main(String[] args) throws Exception{
LongAccumulator accumulator=new LongAccumulator(Long::max,Long.MIN_VALUE);
Thread[] ts=new Thread[100];
for(int i=0;i<100;i++){
ts[i]=new Thread(()->{
Random random=new Random();
long value=random.nextLong();
accumulator.accumulate(value);
});
ts[i].start();
}
for(int i=0;i<100;i++){
ts[i].join();
}
System.out.println(accumulator.longValue());
}
/**
* accumulates by multiple threads produce correct result
*/
public void testAccumulateAndGetMT() {
final int incs = 1000000;
final int nthreads = 4;
final ExecutorService pool = Executors.newCachedThreadPool();
LongAccumulator a = new LongAccumulator(Long::max, 0L);
Phaser phaser = new Phaser(nthreads + 1);
for (int i = 0; i < nthreads; ++i)
pool.execute(new AccTask(a, phaser, incs));
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
long expected = incs - 1;
long result = a.get();
assertEquals(expected, result);
pool.shutdown();
}
/**
* Constructor to initialize QueryScheduler
* @param queryExecutor QueryExecutor engine to use
* @param resourceManager for managing server thread resources
* @param serverMetrics server metrics collector
*/
public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
@Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics,
@Nonnull LongAccumulator latestQueryTime) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(queryExecutor);
Preconditions.checkNotNull(resourceManager);
Preconditions.checkNotNull(serverMetrics);
this.serverMetrics = serverMetrics;
this.resourceManager = resourceManager;
this.queryExecutor = queryExecutor;
this.latestQueryTime = latestQueryTime;
this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
this.numDroppedLogCounter = new AtomicInteger(0);
LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
}
@Test
public void givenLongAccumulator_whenApplyActionOnItFromMultipleThrads_thenShouldProduceProperResult() throws InterruptedException {
// given
ExecutorService executorService = Executors.newFixedThreadPool(8);
LongBinaryOperator sum = Long::sum;
LongAccumulator accumulator = new LongAccumulator(sum, 0L);
int numberOfThreads = 4;
int numberOfIncrements = 100;
// when
Runnable accumulateAction = () -> IntStream.rangeClosed(0, numberOfIncrements).forEach(accumulator::accumulate);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(accumulateAction);
}
// then
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
executorService.shutdown();
assertEquals(accumulator.get(), 20200);
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
/**
* Copy the mappings stored in a {@link LocatableResolver} to the specified replica.
*
* @param replica The {@link LocatableResolver} to copy to.
* @return A future that will complete when the copy is finished.
*/
public CompletableFuture<Void> copyToAsync(@Nonnull final LocatableResolver replica) {
if (!replica.getDatabase().equals(runner.getDatabase())) {
throw new IllegalArgumentException("copy must be within same database");
}
final LongAccumulator maxAccumulator = new LongAccumulator(Long::max, 0L);
final AtomicInteger counter = new AtomicInteger();
return copyInternal(replica, maxAccumulator, counter)
.thenCompose(ignore -> replica.setWindow(maxAccumulator.get()));
}
/**
* LongAccumulator以类型为LongBinaryOperatorlambda表达式构建
* 而不是仅仅执行加法操作
*/
public void longAccumulatorDemo() {
ExecutorService executor = Executors.newFixedThreadPool(2);
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);
IntStream.range(0, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
SynchronizationLocks.stopExecutor(executor);
// => 2539
System.out.println(accumulator.getThenReset());
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
/**
* accumulate accumulates given value to current, and get returns current value
*/
public void testAccumulateAndGet() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
ai.accumulate(2);
assertEquals(2, ai.get());
ai.accumulate(-4);
assertEquals(2, ai.get());
ai.accumulate(4);
assertEquals(4, ai.get());
}
/**
* reset() causes subsequent get() to return zero
*/
public void testReset() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
ai.accumulate(2);
assertEquals(2, ai.get());
ai.reset();
assertEquals(0, ai.get());
}
/**
* getThenReset() returns current value; subsequent get() returns zero
*/
public void testGetThenReset() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
ai.accumulate(2);
assertEquals(2, ai.get());
assertEquals(2, ai.getThenReset());
assertEquals(0, ai.get());
}
/**
* toString returns current value.
*/
public void testToString() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
assertEquals("0", ai.toString());
ai.accumulate(1);
assertEquals(Long.toString(1), ai.toString());
}
/**
* intValue returns current value.
*/
public void testIntValue() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
assertEquals(0, ai.intValue());
ai.accumulate(1);
assertEquals(1, ai.intValue());
}
/**
* longValue returns current value.
*/
public void testLongValue() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
assertEquals(0, ai.longValue());
ai.accumulate(1);
assertEquals(1, ai.longValue());
}
/**
* floatValue returns current value.
*/
public void testFloatValue() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
assertEquals(0.0f, ai.floatValue());
ai.accumulate(1);
assertEquals(1.0f, ai.floatValue());
}
/**
* doubleValue returns current value.
*/
public void testDoubleValue() {
LongAccumulator ai = new LongAccumulator(Long::max, 0L);
assertEquals(0.0, ai.doubleValue());
ai.accumulate(1);
assertEquals(1.0, ai.doubleValue());
}
public void run() {
phaser.arriveAndAwaitAdvance();
LongAccumulator a = acc;
for (int i = 0; i < incs; ++i)
a.accumulate(i);
result = a.get();
phaser.arrive();
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}
static void testLongAccumulator() {
LongBinaryOperator plus = (LongBinaryOperator & Serializable) (x, y) -> x + y;
LongAccumulator a = new LongAccumulator(plus, -2);
a.accumulate(34);
LongAccumulator result = echo(a);
if (result.get() != a.get())
throw new RuntimeException("Unexpected value");
a.reset();
result.reset();
if (result.get() != a.get())
throw new RuntimeException("Unexpected value after reset");
checkSerialClassName(a, "java.util.concurrent.atomic.LongAccumulator$SerializationProxy");
}