java.util.concurrent.atomic.AtomicLong#incrementAndGet()源码实例Demo

下面列出了java.util.concurrent.atomic.AtomicLong#incrementAndGet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: jetcache   文件: RefreshCacheTest.java
public static void computeIfAbsentTest(Cache<Object, Object> cache) throws Exception {
    RefreshPolicy oldPolicy = cache.config().getRefreshPolicy();
    cache.config().setRefreshPolicy(RefreshPolicy.newPolicy(50, TimeUnit.MILLISECONDS));

    AtomicLong value = new AtomicLong();
    Function loader = k-> value.incrementAndGet();
    long t = cache.config().getRefreshPolicy().getRefreshMillis();

    Object v = cache.computeIfAbsent("k1", loader);
    Assert.assertEquals(v, cache.get("k1"));
    Thread.sleep((long) (t * 1.5));
    Assert.assertNotEquals(v, cache.get("k1"));

    v = cache.computeIfAbsent("k2", loader, false);
    Assert.assertEquals(v, cache.get("k2"));
    Thread.sleep((long) (t * 1.5));
    Assert.assertNotEquals(v, cache.get("k2"));

    v = cache.computeIfAbsent("k3", loader, false, 10, TimeUnit.SECONDS);
    Assert.assertEquals(v, cache.get("k3"));
    Thread.sleep((long) (t * 1.5));
    Assert.assertNotEquals(v, cache.get("k3"));

    getRefreshCache(cache).stopRefresh();
    cache.config().setRefreshPolicy(oldPolicy);
}
 
源代码2 项目: spectator   文件: PolledMeterTest.java
@Test
public void removeAndAddRepeatedlyCounter() {
  Registry r = new DefaultRegistry();
  Id id = r.createId("test");

  AtomicLong value = new AtomicLong();
  for (int i = 0; i < 10; ++i) {
    PolledMeter.using(r).withId(id).monitorMonotonicCounter(value);
    PolledMeter.update(r);
    value.incrementAndGet();
    PolledMeter.update(r);
    PolledMeter.remove(r, id);
  }

  Assertions.assertEquals(10, r.counter("test").count());
}
 
源代码3 项目: RxJava3-preview   文件: ObservableAmbTest.java
@SuppressWarnings("unchecked")
@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
    final AtomicLong count = new AtomicLong();
    Consumer<Disposable> incrementer = new Consumer<Disposable>() {
        @Override
        public void accept(Disposable s) {
            count.incrementAndGet();
        }
    };

    //this aync stream should emit first
    Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
            .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
    //this stream emits second
    Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
            .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
    TestObserver<Integer> ts = new TestObserver<Integer>();
    Observable.ambArray(o1, o2).subscribe(ts);
    ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
    ts.assertNoErrors();
    assertEquals(2, count.get());
}
 
源代码4 项目: p4ic4idea   文件: RpcStreamConnection.java
/**
 * Incomplete read; just try until we get a complete or something goes
 * wrong...
 */
private int continueReadIfIncompleteRead(@Nonnull final AtomicLong streamRecvs,
        final int payloadLength, @Nonnull final byte[] packetBytes, final int packetBytesRead)
        throws IOException, ConnectionException {
    int totalPacketBytesRead = packetBytesRead;
    while (totalPacketBytesRead < payloadLength) {
        stats.incompleteReads.incrementAndGet();
        int moreBytesRead = topInputStream.read(packetBytes, totalPacketBytesRead,
                payloadLength - totalPacketBytesRead);
        throwConnectionExceptionIfConditionFails(moreBytesRead >= 0,
                "Perforce server network connection closed unexpectedly");

        streamRecvs.incrementAndGet();
        stats.totalBytesRecv.getAndAdd(moreBytesRead);
        totalPacketBytesRead += moreBytesRead;
    }
    return totalPacketBytesRead;
}
 
源代码5 项目: p4ic4idea   文件: RpcStreamConnection.java
/**
 * If we get a partial read, try again until something goes wrong...
 */
private int continueReadIfGetPartialRead(@Nonnull final byte[] preambleBytes,
        final int bytesRead, @Nonnull final AtomicLong streamRecvs)
        throws IOException, ConnectionException {
    int totalBytesRead = bytesRead;
    while ((totalBytesRead >= 0) && (totalBytesRead < preambleBytes.length)) {
        int moreBytesRead = topInputStream.read(preambleBytes, totalBytesRead,
                preambleBytes.length - totalBytesRead);
        throwConnectionExceptionIfConditionFails(moreBytesRead >= 0,
                "server connection unexpectedly closed");

        streamRecvs.incrementAndGet();
        totalBytesRead += moreBytesRead;
    }
    return totalBytesRead;
}
 
源代码6 项目: Jaffree   文件: FFmpegTest.java
@Test
public void testProgress() throws Exception {
    Path tempDir = Files.createTempDirectory("jaffree");
    Path outputPath = tempDir.resolve("test.mkv");

    final AtomicLong counter = new AtomicLong();

    ProgressListener listener = new ProgressListener() {
        @Override
        public void onProgress(FFmpegProgress progress) {
            counter.incrementAndGet();
        }
    };

    FFmpegResult result = FFmpeg.atPath(BIN)
            .addInput(UrlInput.fromPath(SMALL_FLV))
            .addOutput(UrlOutput.toPath(outputPath))
            .setProgressListener(listener)
            .execute();

    Assert.assertNotNull(result);
    Assert.assertTrue(counter.get() > 0);
}
 
@Test
public void subclassWithCustomIdAndNoTimestamp() {
	final AtomicLong id = new AtomicLong();
	@SuppressWarnings("serial")
	class MyMH extends MessageHeaders {
		public MyMH() {
			super(null, new UUID(0, id.incrementAndGet()), -1L);
		}
	}
	MessageHeaders headers = new MyMH();
	assertEquals("00000000-0000-0000-0000-000000000001", headers.getId().toString());
	assertEquals(1, headers.size());
}
 
源代码8 项目: nd4j   文件: EndlessWorkspaceTests.java
/**
 * This test checks for allocation from workspace AND spills
 * @throws Exception
 */
@Test
public void endlessTest2() throws Exception {
    Nd4j.getWorkspaceManager().setDefaultWorkspaceConfiguration(
                    WorkspaceConfiguration.builder().initialSize(10 * 1024L * 1024L).build());

    Nd4j.getMemoryManager().togglePeriodicGc(false);

    AtomicLong counter = new AtomicLong(0);
    while (true) {
        try (MemoryWorkspace workspace = Nd4j.getWorkspaceManager().getAndActivateWorkspace()) {
            long time1 = System.nanoTime();
            INDArray array = Nd4j.create(2 * 1024 * 1024);
            long time2 = System.nanoTime();
            array.addi(1.0f);
            assertEquals(1.0f, array.meanNumber().floatValue(), 0.1f);

            long time3 = System.nanoTime();
            INDArray array2 = Nd4j.create(3 * 1024 * 1024);
            long time4 = System.nanoTime();

            if (counter.incrementAndGet() % 1000 == 0) {
                log.info("{} iterations passed... Allocation time: {} vs {} (ns)", counter.get(), time2 - time1,
                                time4 - time3);
                System.gc();
            }
        }
    }
}
 
源代码9 项目: spectator   文件: SwapMeterTest.java
@Test
public void versionUpdateExpiration() {
  AtomicLong version = new AtomicLong();
  Counter c = new DefaultCounter(clock, counterId);
  SwapCounter sc = new SwapCounter(registry, version::get, counterId, c);

  sc.increment();
  Assertions.assertFalse(sc.hasExpired());

  version.incrementAndGet();
  Assertions.assertTrue(sc.hasExpired());
  sc.increment();
  Assertions.assertFalse(sc.hasExpired());
}
 
源代码10 项目: immutables   文件: TypeHolder.java
static Supplier<ImmutableCompositeHolder> generator() {
  AtomicLong counter = new AtomicLong();
  Supplier<ImmutableBooleanHolder> bool = BooleanHolder.generator();
  Supplier<ImmutableIntegerHolder> integer = IntegerHolder.generator();
  Supplier<ImmutableStringHolder> string = StringHolder.generator();
  return () -> {
    String id = "id" + counter.incrementAndGet();
    return ImmutableCompositeHolder.builder().id(id)
            .booleanHolder(bool.get().withId(id))
            .integer(integer.get().withId(id))
            .string(string.get().withId(id))
            .build();
  };
}
 
源代码11 项目: reactor-core   文件: FluxFilterWhenTest.java
@Test
public void take1Cancel() {
	AtomicLong onNextCount = new AtomicLong();
	AtomicReference<SignalType> endSignal = new AtomicReference<>();
	BaseSubscriber<Object> bs = new BaseSubscriber<Object>() {

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			requestUnbounded();
		}

		@Override
		public void hookOnNext(Object t) {
			onNextCount.incrementAndGet();
			cancel();
			onComplete();
		}

		@Override
		protected void hookFinally(SignalType type) {
			endSignal.set(type);
		}
	};

	Flux.range(1, 1000)
	    .filterWhen(v -> Mono.just(true).hide())
	    .subscribe(bs);

	assertThat(onNextCount.get()).isEqualTo(1);
	assertThat(endSignal.get()).isEqualTo(SignalType.CANCEL);
}
 
源代码12 项目: attic-aurora   文件: StatsTest.java
@Test
public void testNormalizesSpace() {
  AtomicLong leading = Stats.exportLong("  leading space");
  AtomicLong trailing = Stats.exportLong("trailing space   ");
  AtomicLong surround = Stats.exportLong("   surround space   ");

  leading.incrementAndGet();
  trailing.incrementAndGet();
  surround.incrementAndGet();
  assertCounter("__leading_space", 1);
  assertCounter("trailing_space___", 1);
  assertCounter("___surround_space___", 1);
}
 
源代码13 项目: reactor-core   文件: FluxFilterWhenTest.java
@Test
public void take1CancelBackpressured() {
	AtomicLong onNextCount = new AtomicLong();
	AtomicReference<SignalType> endSignal = new AtomicReference<>();
	BaseSubscriber<Object> bs = new BaseSubscriber<Object>() {

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(1);
		}

		@Override
		public void hookOnNext(Object t) {
			onNextCount.incrementAndGet();
			cancel();
			onComplete();
		}

		@Override
		protected void hookFinally(SignalType type) {
			endSignal.set(type);
		}
	};

	Flux.range(1, 1000)
	        .filterWhen(v -> Mono.just(true).hide())
	        .subscribe(bs);

	assertThat(onNextCount.get()).isEqualTo(1);
	assertThat(endSignal.get()).isEqualTo(SignalType.CANCEL);
}
 
源代码14 项目: summerframework   文件: TestRedissonLock.java
@Test
    public void testUpdateWithLock() throws InterruptedException {
        AtomicLong errorCount=new AtomicLong();
        CountDownLatch countDownLatch=new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            int id=i;
            AtomicLong count=new AtomicLong();
            Thread t=new Thread(()->{
                while (count.incrementAndGet()<runCount){
                    String mapKey=id%2+"";
//                    log.info("put {} {}",mapKey,id);
                    String result = testService.updateWithLock(mapKey, id);
                    if(result==null){
                        log.error("testService.updateWithLock was null");
                    }
                    if(!result.equals(testService.format(mapKey,id))){
                        errorCount.incrementAndGet();
                    }
                }
                countDownLatch.countDown();
            });
            t.start();
        }
        countDownLatch.await();
        log.info("errorCount = {} ",errorCount);
        Assert.assertTrue(errorCount.get()==0);
    }
 
源代码15 项目: reactor-core   文件: MonoFilterWhenTest.java
@Test
public void take1CancelBackpressured() {
	AtomicLong onNextCount = new AtomicLong();
	AtomicReference<SignalType> endSignal = new AtomicReference<>();
	BaseSubscriber<Object> bs = new BaseSubscriber<Object>() {

		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(1);
		}

		@Override
		public void hookOnNext(Object t) {
			onNextCount.incrementAndGet();
			cancel();
			onComplete();
		}

		@Override
		protected void hookFinally(SignalType type) {
			endSignal.set(type);
		}
	};

	Mono.just(1)
	        .filterWhen(v -> Mono.just(true).hide())
	        .subscribe(bs);

	assertThat(onNextCount.get()).isEqualTo(1);
	assertThat(endSignal.get()).isEqualTo(SignalType.CANCEL);
}
 
源代码16 项目: crate   文件: ClusterFormationFailureHelperTests.java
public void testScheduling() {
    final long expectedDelayMillis;
    final Settings.Builder settingsBuilder = Settings.builder();
    if (randomBoolean()) {
        expectedDelayMillis
            = ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis();
    } else {
        expectedDelayMillis = randomLongBetween(100, 100000);
        settingsBuilder.put(ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.getKey(),
            expectedDelayMillis + "ms");
    }

    final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
    final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
        .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();

    final DeterministicTaskQueue deterministicTaskQueue
        = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());

    final AtomicLong warningCount = new AtomicLong();
    final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong();

    final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
        () -> {
            warningCount.incrementAndGet();
            return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
        },
        deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet());

    deterministicTaskQueue.runAllTasks();
    assertThat("should not schedule anything yet", warningCount.get(), is(0L));

    final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();
    clusterFormationFailureHelper.start();

    while (warningCount.get() == 0) {
        assertTrue(clusterFormationFailureHelper.isRunning());
        if (deterministicTaskQueue.hasRunnableTasks()) {
            deterministicTaskQueue.runRandomTask();
        } else {
            deterministicTaskQueue.advanceTime();
        }
    }
    assertThat(warningCount.get(), is(1L));
    assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(expectedDelayMillis));

    while (warningCount.get() < 5) {
        assertTrue(clusterFormationFailureHelper.isRunning());
        if (deterministicTaskQueue.hasRunnableTasks()) {
            deterministicTaskQueue.runRandomTask();
        } else {
            deterministicTaskQueue.advanceTime();
        }
    }
    assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, equalTo(5 * expectedDelayMillis));

    clusterFormationFailureHelper.stop();
    assertFalse(clusterFormationFailureHelper.isRunning());
    deterministicTaskQueue.runAllTasksInTimeOrder();

    assertThat(warningCount.get(), is(5L));
    assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));

    warningCount.set(0);
    logLastFailedJoinAttemptWarningCount.set(0);
    clusterFormationFailureHelper.start();
    clusterFormationFailureHelper.stop();
    clusterFormationFailureHelper.start();
    final long secondStartTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();

    while (warningCount.get() < 5) {
        assertTrue(clusterFormationFailureHelper.isRunning());
        if (deterministicTaskQueue.hasRunnableTasks()) {
            deterministicTaskQueue.runRandomTask();
        } else {
            deterministicTaskQueue.advanceTime();
        }
    }
    assertThat(deterministicTaskQueue.getCurrentTimeMillis() - secondStartTimeMillis, equalTo(5 * expectedDelayMillis));

    clusterFormationFailureHelper.stop();
    assertFalse(clusterFormationFailureHelper.isRunning());
    deterministicTaskQueue.runAllTasksInTimeOrder();

    assertThat(warningCount.get(), is(5L));
    assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
}
 
源代码17 项目: incubator-retired-blur   文件: BlockCacheTest.java
private boolean runTest(int blockSize, int file, int blocksInTest, BlockCache blockCache) {
  byte[] buffer = new byte[blockSize];
  Random random = new Random();

  byte[] newData = new byte[blockSize];
  AtomicLong hitsInCache = new AtomicLong();
  AtomicLong missesInCache = new AtomicLong();
  long storeTime = 0;
  long fetchTime = 0;
  int passes = 10000;

  BlockCacheKey blockCacheKey = new BlockCacheKey();

  for (int j = 0; j < passes; j++) {
    long block = random.nextInt(blocksInTest);
    blockCacheKey.setBlock(block);
    blockCacheKey.setFile(file);

    if (blockCache.fetch(blockCacheKey, buffer)) {
      hitsInCache.incrementAndGet();
    } else {
      missesInCache.incrementAndGet();
    }

    byte[] testData = testData(random, blockSize, newData);
    long t1 = System.nanoTime();
    boolean store = blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
    storeTime += (System.nanoTime() - t1);

    if (store) {
      long t3 = System.nanoTime();
      if (blockCache.fetch(blockCacheKey, buffer)) {
        fetchTime += (System.nanoTime() - t3);
        if (!Arrays.equals(testData, buffer)) {
          return false;
        }
      }
    }
  }
  System.out.println("Cache Hits    = " + hitsInCache.get());
  System.out.println("Cache Misses  = " + missesInCache.get());
  System.out.println("Store         = avg " + (storeTime / (double) passes) / 1000000.0 + " ms");
  System.out.println("Fetch         = avg " + (fetchTime / (double) passes) / 1000000.0 + " ms");
  System.out.println("# of Elements = " + blockCache.getSize());
  return true;
}
 
/**
 * Case 28-jul-2016
 * Simultaneous usage of expression evaluator from multiple threads.
 *
 * <a href="https://github.com/raydac/java-binary-block-parser/issues/10">Issue #10, assertArrayLength throws exception in multi-thread</a>
 *
 * @throws Exception for any error
 */
@Test
public void testMutlithredUsageOfParser() throws Exception {
  final JBBPParser parserIP = JBBPParser.prepare("skip:14; // skip bytes till the frame\n"
      + "bit:4 InternetHeaderLength;"
      + "bit:4 Version;"
      + "bit:2 ECN;"
      + "bit:6 DSCP;"
      + "ushort TotalPacketLength;"
      + "ushort Identification;"
      + "bit:8 IPFlagsAndFragmentOffset_low;"
      + "bit:5 IPFlagsAndFragmentOffset_high;"
      + "bit:1 MoreFragment;"
      + "bit:1 DonotFragment;"
      + "bit:1 ReservedBit;"
      + "ubyte TTL;"
      + "ubyte Protocol;"
      + "ushort HeaderChecksum;"
      + "int SourceAddress;"
      + "int DestinationAddress;"
      + "byte [(InternetHeaderLength-5)*4] Options;");

  final JBBPParser parserTCP = JBBPParser.prepare("skip:34; // skip bytes till the frame\n"
      + "ushort SourcePort;"
      + "ushort DestinationPort;"
      + "int SequenceNumber;"
      + "int AcknowledgementNumber;"
      + "bit:1 NONCE;"
      + "bit:3 RESERVED;"
      + "bit:4 HLEN;"
      + "bit:1 FIN;"
      + "bit:1 SYN;"
      + "bit:1 RST;"
      + "bit:1 PSH;"
      + "bit:1 ACK;"
      + "bit:1 URG;"
      + "bit:1 ECNECHO;"
      + "bit:1 CWR;"
      + "ushort WindowSize;"
      + "ushort TCPCheckSum;"
      + "ushort UrgentPointer;"
      + "byte [HLEN*4-20] Option;"
      + "byte [_] Data;");

  byte[] testArray;
  try (InputStream inStream = getResourceAsInputStream("tcppacket.bin")) {
    testArray = new JBBPBitInputStream(inStream).readByteArray(-1);
    assertEquals(173, testArray.length);
  }

  final byte[] theData = testArray;

  final AtomicInteger errorCounter = new AtomicInteger();
  final AtomicLong parsingCounter = new AtomicLong();

  final int ITERATIONS = 1000;

  final Runnable test = () -> {
    for (int i = 0; i < ITERATIONS; i++) {
      try {
        Thread.sleep(System.nanoTime() & 0xF);
        final byte[] ippacket = parserTCP.parse(theData).findFieldForNameAndType("Data", JBBPFieldArrayByte.class).getArray();
        assertEquals(119, ippacket.length);
        final byte[] optionsip = parserIP.parse(ippacket).findFieldForNameAndType("Options", JBBPFieldArrayByte.class).getArray();
        assertEquals(4, optionsip.length);
        parsingCounter.incrementAndGet();
      } catch (Exception ex) {
        ex.printStackTrace();
        errorCounter.incrementAndGet();
      }
    }
  };

  final Thread[] threads = new Thread[15];

  for (int i = 0; i < threads.length; i++) {
    final Thread testThread = new Thread(test, "jbbp_test_thread" + i);
    testThread.setDaemon(true);
    threads[i] = testThread;
    testThread.start();
  }

  for (final Thread t : threads) {
    t.join();
  }

  assertEquals(threads.length * ITERATIONS, parsingCounter.get());
  assertEquals(0, errorCounter.get());
}
 
源代码19 项目: adt   文件: MysqlBinlogProcessorTest.java
@Test
public void test_RowEventProcessor() throws Exception{
    
    final MysqlServerInfo serverInfo = new MysqlServerInfo(DB_HOST, DB_PORT, DB_USER, DB_PW, Collections.emptyList(), true);
    final String startLogFileName = serverInfo.getBinaryLogFileList().get(0).getLogName();
    
    final MysqlBinlogProcessorConfig config = new MysqlBinlogProcessorConfig();
    config.host = DB_HOST;
    config.port = DB_PORT;
    config.user = DB_USER;
    config.password = DB_PW;
    config.databaseList = Arrays.asList(DB_TEST_SCHEMA);
    config.mysqlSlaveServerId = DB_SLAVE_SERVER_ID;
    config.binlogFileName = startLogFileName;
    config.binlogFilePosition = 4;
    config.taskQueueCount = 256;
    config.eventBufferSize = 1000*1000;
    config.workerThreadCorePoolSize = 4;
    config.handlerClassName = TestMysqlBinlogProcessorHandler.class.getName();
    
    final int runtimeInSec = 30;
    final AtomicLong accTaskProcCount = new AtomicLong(0);
    
    MysqlBinlogProcessor processor = new MysqlBinlogProcessor(config) {
        
        @Override
        public void processData(MysqlBinlogData data) {
            accTaskProcCount.incrementAndGet();
        }

        @Override
        public int getMaxWorkerCount() {
            return this.threadPool.getMaximumPoolSize();
        }
    };
    processor.start();
    
    long startTime = System.currentTimeMillis();
    long exInputCnt = 0;
    long exExecuteCnt = 0;
    long exOutputCnt = 0;
    long exTaskProcCnt = 0;
    
    for(int i=0; i<runtimeInSec; i++){
        Thread.sleep(1000);
        long deltaTime = System.currentTimeMillis() - startTime;
        long curInputCnt = processor.getInputCount();
        long curExecuteCnt = processor.getExecuteCount();
        long curOutputCnt = processor.getOutputCount();
        long curTaskProcCnt = accTaskProcCount.get();
        
        System.out.println(String.format("%d, %d, %d, %d", curInputCnt, curExecuteCnt, curOutputCnt, curTaskProcCnt));
        System.out.println(String.format(
                "AVG   %d i/msec,   %d e/msec,   %d o/msec,   %d p/msec", 
                curInputCnt/deltaTime,
                curExecuteCnt/deltaTime,
                curOutputCnt/deltaTime,
                curTaskProcCnt/deltaTime));
        
        assertTrue(exInputCnt <= curInputCnt);
        assertTrue(exExecuteCnt <= curExecuteCnt);
        assertTrue(exOutputCnt <= curOutputCnt);
        assertTrue(exTaskProcCnt <= curTaskProcCnt);
        
        exInputCnt = curInputCnt;
        exExecuteCnt = curExecuteCnt;
        exOutputCnt = curOutputCnt;
        exTaskProcCnt = curTaskProcCnt;
        
    }
    
    processor.stop();
       
}
 
源代码20 项目: reactor-core   文件: ScatterGatherTests.java
public long add(String colour) {
	AtomicLong value = counts.getOrDefault(colour, new AtomicLong());
	counts.putIfAbsent(colour, value);
	return value.incrementAndGet();
}