java.util.concurrent.atomic.AtomicInteger#incrementAndGet()源码实例Demo

下面列出了java.util.concurrent.atomic.AtomicInteger#incrementAndGet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: activemq-artemis   文件: MeshTest.java
@Test
public void testSendReceiveTopicNonDurable() throws Throwable {
   CountDownLatch latch = new CountDownLatch(1);

   setVariable(receiverClassloader, "latch", latch);
   AtomicInteger errors = new AtomicInteger(0);
   Thread t = new Thread() {
      @Override
      public void run() {
         try {
            evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveNonDurableSubscription");
         } catch (Exception e) {
            e.printStackTrace();
            errors.incrementAndGet();
         }
      }
   };

   t.start();
   Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
   evaluate(senderClassloader,"meshTest/sendMessages.groovy", server, sender, "sendTopic");

   t.join();

   Assert.assertEquals(0, errors.get());
}
 
源代码2 项目: brooklyn-server   文件: TaskPerformanceTest.java
@Test(groups={"Integration", "Acceptance"})
public void testExecuteRunnableWithTags() {
    double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
    
    final AtomicInteger counter = new AtomicInteger();
    final CountDownLatch completionLatch = new CountDownLatch(1);

    final Runnable work = new Runnable() {
        @Override public void run() {
            int val = counter.incrementAndGet();
            if (val >= numIterations) completionLatch.countDown();
        }
    };

    final Map<String, ?> flags = MutableMap.of("tags", ImmutableList.of("a","b"));
    
    measure(PerformanceTestDescriptor.create()
            .summary("TaskPerformanceTest.testExecuteRunnableWithTags")
            .iterations(numIterations)
            .minAcceptablePerSecond(minRatePerSec)
            .job(new Runnable() {
                @Override public void run() {
                    executionManager.submit(flags, work);
                }})
            .completionLatch(completionLatch));
}
 
源代码3 项目: Study_Android_Demo   文件: SettingsProvider.java
@Override
public int update(Uri url, ContentValues initialValues, String where, String[] whereArgs) {
    // NOTE: update() is never called by the front-end Settings API, and updates that
    // wind up affecting rows in Secure that are globally shared will not have the
    // intended effect (the update will be invisible to the rest of the system).
    // This should have no practical effect, since writes to the Secure db can only
    // be done by system code, and that code should be using the correct API up front.
    int callingUser = UserHandle.getCallingUserId();
    if (LOCAL_LOGV) Slog.v(TAG, "update() for user " + callingUser);
    SqlArguments args = new SqlArguments(url, where, whereArgs);
    if (TABLE_FAVORITES.equals(args.table)) {
        return 0;
    } else if (TABLE_GLOBAL.equals(args.table)) {
        callingUser = UserHandle.USER_OWNER;
    }
    checkWritePermissions(args);
    checkUserRestrictions(initialValues.getAsString(Settings.Secure.NAME), callingUser);

    final AtomicInteger mutationCount;
    synchronized (this) {
        mutationCount = sKnownMutationsInFlight.get(callingUser);
    }
    if (mutationCount != null) {
        mutationCount.incrementAndGet();
    }
    DatabaseHelper dbH = getOrEstablishDatabase(callingUser);
    SQLiteDatabase db = dbH.getWritableDatabase();
    int count = db.update(args.table, initialValues, args.where, args.args);
    if (mutationCount != null) {
        mutationCount.decrementAndGet();
    }
    if (count > 0) {
        invalidateCache(callingUser, args.table);  // before we notify
        sendNotify(url, callingUser);
    }
    startAsyncCachePopulation(callingUser);
    if (LOCAL_LOGV) Log.v(TAG, args.table + ": " + count + " row(s) <- " + initialValues);
    return count;
}
 
源代码4 项目: servicetalk   文件: TestExecutor.java
private static void execute(Queue<RunnableWrapper> tasks, AtomicInteger taskCount) {
    for (Iterator<RunnableWrapper> i = tasks.iterator(); i.hasNext();) {
        final Runnable task = i.next();
        i.remove();
        taskCount.incrementAndGet();
        task.run();
    }
}
 
源代码5 项目: flow   文件: ElementTest.java
@Test
public void listenerReceivesEvents() {
    Element e = ElementFactory.createDiv();
    AtomicInteger listenerCalls = new AtomicInteger(0);
    DomEventListener myListener = event -> listenerCalls.incrementAndGet();

    e.addEventListener("click", myListener);
    Assert.assertEquals(0, listenerCalls.get());
    e.getNode().getFeature(ElementListenerMap.class)
            .fireEvent(new DomEvent(e, "click", Json.createObject()));
    Assert.assertEquals(1, listenerCalls.get());
}
 
源代码6 项目: rocketmq   文件: AbstractTestCase.java
protected int consumeMessages(int count, final String key, int timeout) {
    final AtomicInteger cc = new AtomicInteger(0);
    for (Message message : messages) {
        String body = new String(message.getBody());
        if (body.contains(key)) {
            cc.incrementAndGet();
        }
    }
    return cc.get();
}
 
源代码7 项目: vespa   文件: JsonReader.java
/**
 * Process one inputstream and send all documents to feedclient.
 *
 * @param inputStream source of array of json document.
 * @param feedClient where data is sent.
 * @param numSent counter to be incremented for every document streamed.
 */
public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
    try (InputStreamJsonElementBuffer jsonElementBuffer = new InputStreamJsonElementBuffer(inputStream)) {
        JsonFactory jfactory = new JsonFactory().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES);
        JsonParser jParser = jfactory.createParser(jsonElementBuffer);
        while (true) {
            int documentStart = (int) jParser.getCurrentLocation().getCharOffset();
            String docId = parseOneDocument(jParser);
            if (docId == null) {
                int documentEnd = (int) jParser.getCurrentLocation().getCharOffset();
                int documentLength = documentEnd - documentStart;
                int maxTruncatedLength = 500;
                StringBuilder stringBuilder = new StringBuilder(maxTruncatedLength + 3);
                for (int i = 0; i < Math.min(documentLength, maxTruncatedLength); i++)
                    stringBuilder.append(jsonElementBuffer.circular.get(documentStart + i));

                if (documentLength > maxTruncatedLength)
                    stringBuilder.append("...");

                throw new IllegalArgumentException("Document is missing ID: '" + stringBuilder.toString() + "'");
            }
            CharSequence data = jsonElementBuffer.getJsonAsArray(jParser.getCurrentLocation().getCharOffset());
            feedClient.stream(docId, data);
            numSent.incrementAndGet();
        }
    } catch (EOFException ignored) {
        // No more documents
    } catch (IOException ioe) {
        System.err.println(ioe.getMessage());
        throw new UncheckedIOException(ioe);
    }
}
 
@Test
void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
    TaskId id = TaskId.generateTaskId();
    AtomicInteger counter = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(1);

    Task inProgressTask = new MemoryReferenceTask(() -> {
        await(latch);
        counter.incrementAndGet();
        return Task.Result.COMPLETED;
    });

    TaskWithId taskWithId = new TaskWithId(id, inProgressTask);

    Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
    resultMono.subscribe();

    Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
        .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));

    worker.cancelTask(id);

    resultMono.block(Duration.ofSeconds(10));

    // Due to the use of signals, cancellation cannot be instantaneous
    // Let a grace period for the cancellation to complete to increase test stability
    Thread.sleep(50);

    verify(listener, atLeastOnce()).cancelled(id, Optional.empty());
    verifyNoMoreInteractions(listener);
}
 
源代码9 项目: grpc-java-contrib   文件: PerCallServiceTest.java
@Test
public void perCallShouldInstantiateMultipleInstances() throws Exception {
    AtomicInteger closeCount = new AtomicInteger(0);

    class TestService extends GreeterGrpc.GreeterImplBase implements AutoCloseable {
        public TestService() {}

        @Override
        public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
            responseObserver.onNext(HelloResponse.newBuilder().setMessage(Integer.toString(System.identityHashCode(this))).build());
            responseObserver.onCompleted();
        }

        @Override
        public void close() {
            closeCount.incrementAndGet();
        }
    }

    serverRule.getServiceRegistry().addService(new PerCallService<TestService>(() -> new TestService()));

    GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());

    String oid1 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();
    String oid2 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();
    String oid3 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();

    assertThat(oid1).isNotEqualTo(oid2);
    assertThat(oid1).isNotEqualTo(oid3);
    assertThat(oid2).isNotEqualTo(oid3);

    // let the threads catch up :(
    Thread.sleep(100);

    assertThat(closeCount.get()).isEqualTo(3);
}
 
源代码10 项目: brooklyn-server   文件: ScheduledExecutionTest.java
@Test
    public void testScheduledTaskCancelEnding() throws Exception {
        Duration PERIOD = Duration.millis(20);
        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
        final AtomicInteger i = new AtomicInteger();
        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
            @Override
            public Task<?> call() throws Exception {
                return new BasicTask<Integer>(new Callable<Integer>() {
                    @Override
                    public Integer call() {
                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
                        ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
                        i.incrementAndGet();
                        if (i.get() >= 5) submitter.cancel();
                        return i.get();
                    }});
            }});
    
        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
        m.submit(t);
        log.info("submitted {} {}", t, t.getStatusDetail(false));
        Integer interimResult = (Integer) t.get();
        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
        assertTrue(i.get() > 0);
        t.blockUntilEnded();
//      int finalResult = t.get()
        log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
//      assertEquals(finalResult, 5)
        assertEquals(i.get(), 5);
    }
 
源代码11 项目: Study_Android_Demo   文件: SettingsProvider.java
@Override
public int delete(Uri url, String where, String[] whereArgs) {
    int callingUser = UserHandle.getCallingUserId();
    if (LOCAL_LOGV) Slog.v(TAG, "delete() for user " + callingUser);
    SqlArguments args = new SqlArguments(url, where, whereArgs);
    if (TABLE_FAVORITES.equals(args.table)) {
        return 0;
    } else if (TABLE_OLD_FAVORITES.equals(args.table)) {
        args.table = TABLE_FAVORITES;
    } else if (TABLE_GLOBAL.equals(args.table)) {
        callingUser = UserHandle.USER_OWNER;
    }
    checkWritePermissions(args);

    final AtomicInteger mutationCount;
    synchronized (this) {
        mutationCount = sKnownMutationsInFlight.get(callingUser);
    }
    if (mutationCount != null) {
        mutationCount.incrementAndGet();
    }
    DatabaseHelper dbH = getOrEstablishDatabase(callingUser);
    SQLiteDatabase db = dbH.getWritableDatabase();
    int count = db.delete(args.table, args.where, args.args);
    if (mutationCount != null) {
        mutationCount.decrementAndGet();
    }
    if (count > 0) {
        invalidateCache(callingUser, args.table);  // before we notify
        sendNotify(url, callingUser);
    }
    startAsyncCachePopulation(callingUser);
    if (LOCAL_LOGV) Log.v(TAG, args.table + ": " + count + " row(s) deleted");
    return count;
}
 
源代码12 项目: mantis   文件: OperatorResumeOnCompletedTest.java
@Test
public void testCanResumeOnCompletion() throws Exception {
    final int max = 6;
    final Observable<Integer> ints = Observable.range(1, max);

    final int repeat = 5;
    final AtomicInteger retries = new AtomicInteger();
    Operator<Integer, Integer> resumeOperator = new OperatorResumeOnCompleted<>(
            new ResumeOnCompletedPolicy<Integer>() {
                @Override
                public Observable<Integer> call(final Integer attempts) {
                    if (attempts > repeat) {
                        return null;
                    }
                    retries.incrementAndGet();
                    return Observable.just(attempts + max);

                }
            });

    final CountDownLatch done = new CountDownLatch(1);
    final AtomicInteger completionCount = new AtomicInteger();
    final List<Integer> collected = new ArrayList<>();
    ints
            .lift(resumeOperator)
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    completionCount.incrementAndGet();
                    done.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    fail("There should be no error at all");
                    done.countDown();
                }

                @Override
                public void onNext(Integer integer) {
                    collected.add(integer);
                }
            });

    long timeoutSecs = 5;
    if (!done.await(5, TimeUnit.SECONDS)) {
        fail("Should finish within " + timeoutSecs + " seconds");
    }

    assertEquals(String.format("There should be exactly %d retries", repeat), repeat, retries.get());
    assertEquals("There should be exactly one onCompleted call", 1, completionCount.get());
    List<Integer> expected = Observable.range(1, max + repeat).toList().toBlocking().first();
    assertEquals("The collected should include the original stream plus every attempt", expected, collected);
}
 
源代码13 项目: gumtree-spoon-ast-diff   文件: file_s.java
protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
    AtomicInteger total = new AtomicInteger();
    Iterator<ProcessorExchangePair> it = pairs.iterator();

    while (it.hasNext()) {
        ProcessorExchangePair pair = it.next();
        Exchange subExchange = pair.getExchange();
        updateNewExchange(subExchange, total.get(), pairs, it);

        boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
        if (!sync) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
            }
            // the remainder of the multicast will be completed async
            // so we break out now, then the callback will be invoked which then continue routing from where we left here
            return false;
        }

        if (LOG.isTraceEnabled()) {
            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
        }

        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
        // remember to test for stop on exception and aggregate before copying back results
        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
        if (stopOnException && !continueProcessing) {
            if (subExchange.getException() != null) {
                // wrap in exception to explain where it failed
                CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
                subExchange.setException(cause);
            }
            // we want to stop on exception, and the exception was handled by the error handler
            // this is similar to what the pipeline does, so we should do the same to not surprise end users
            // so we should set the failed exchange as the result and be done
            result.set(subExchange);
            return true;
        }

        LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);

        if (parallelAggregate) {
            doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
        } else {
            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
        }

        total.incrementAndGet();
    }

    LOG.debug("Done sequential processing {} exchanges", total);

    return true;
}
 
/**
 *
 */
@Test
public void testLru1() {
    lruStripes = 1;
    mem = 10;

    final AtomicInteger evictCnt = new AtomicInteger();

    evictLsnr = new GridOffHeapEvictListener() {
        @Override public void onEvict(int part, int hash, byte[] k, byte[] v) {
            String key = new String(k);

            info("Evicted key: " + key);

            evictCnt.incrementAndGet();
        }

        @Override public boolean removeEvicted() {
            return true;
        }
    };

    map = newMap();

    for (int p = 0; p < parts; p++) {
        for (int i = 0; i < 10; i++) {
            String key = string();

            byte[] keyBytes = key.getBytes();
            byte[] valBytes = bytes(100);

            map.insert(p, hash(key), keyBytes, valBytes);

            info("Evicted: " + evictCnt);

            assertEquals(1, evictCnt.get());
            assertEquals(0, map.size());

            assertTrue(evictCnt.compareAndSet(1, 0));
        }
    }
}
 
源代码15 项目: openapi-generator   文件: OnceLogger.java
@SuppressWarnings("ConstantConditions")
private boolean shouldLog(final String msg) {
    AtomicInteger counter = messageCountCache.get(msg, i -> new AtomicInteger(0));
    return counter.incrementAndGet() <= maxRepetitions;
}
 
源代码16 项目: java-client-api   文件: WriteHostBatcherTest.java
@Ignore
public void testAddMultiThreadedLessDocsSuccess() throws Exception {
	System.out.println("In testAddMultiThreadedLessDocsSuccess method");

	final String query1 = "fn:count(fn:doc())";
	final AtomicInteger count = new AtomicInteger(0);

	ihbMT = dmManager.newWriteBatcher();
	ihbMT.withBatchSize(99);
	ihbMT.withThreadCount(10);
	// ihbMT.withTransactionSize(3);

	ihbMT.onBatchSuccess(batch -> {

	}).onBatchFailure((batch, throwable) -> {
		throwable.printStackTrace();

	});
	dmManager.startJob(ihbMT);

	class MyRunnable implements Runnable {

		@Override
		public void run() {

			for (int j = 0; j < 15; j++) {
				String uri = "/local/json-" + j + "-" + Thread.currentThread().getId();
				System.out.println("Thread name: " + Thread.currentThread().getName() + "  URI:" + uri);
				ihbMT.add(uri, fileHandle);
			}
			ihbMT.flushAndWait();
		}

	}

	class CountRunnable implements Runnable {

		@Override
		public void run() {
			try {
				Thread.currentThread().sleep(15000L);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			Set<Thread> threads = Thread.getAllStackTraces().keySet();
			Iterator<Thread> iter = threads.iterator();
			while (iter.hasNext()) {
				Thread t = iter.next();
				if (t.getName().contains("pool-1-thread-"))
					System.out.println(t.getName());
				count.incrementAndGet();
			}

		}

	}
	Thread countT;
	countT = new Thread(new CountRunnable());

	Thread t1, t2, t3;
	t1 = new Thread(new MyRunnable());
	t2 = new Thread(new MyRunnable());
	t3 = new Thread(new MyRunnable());

	countT.start();
	t1.start();
	t2.start();
	t3.start();

	countT.join();

	t1.join();
	t2.join();
	t3.join();

	// Assert.assertTrue(count.intValue()==10);
	Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 45);
	clearDB(port);
}
 
源代码17 项目: nd4j   文件: AtomicAllocator.java
/**
 * This method seeks for unused zero-copy memory allocations
 *
 * @param bucketId Id of the bucket, serving allocations
 * @return size of memory that was deallocated
 */
protected synchronized long seekUnusedZero(Long bucketId, Aggressiveness aggressiveness) {
    AtomicLong freeSpace = new AtomicLong(0);

    int totalElements = (int) memoryHandler.getAllocatedHostObjects(bucketId);

    // these 2 variables will contain jvm-wise memory access frequencies
    float shortAverage = zeroShort.getAverage();
    float longAverage = zeroLong.getAverage();

    // threshold is calculated based on agressiveness specified via configuration
    float shortThreshold = shortAverage / (Aggressiveness.values().length - aggressiveness.ordinal());
    float longThreshold = longAverage / (Aggressiveness.values().length - aggressiveness.ordinal());

    // simple counter for dereferenced objects
    AtomicInteger elementsDropped = new AtomicInteger(0);
    AtomicInteger elementsSurvived = new AtomicInteger(0);

    for (Long object : memoryHandler.getHostTrackingPoints(bucketId)) {
        AllocationPoint point = getAllocationPoint(object);

        // point can be null, if memory was promoted to device and was deleted there
        if (point == null)
            continue;

        if (point.getAllocationStatus() == AllocationStatus.HOST) {
            //point.getAccessState().isToeAvailable()
            //point.getAccessState().requestToe();

            /*
                Check if memory points to non-existant buffer, using externals.
                If externals don't have specified buffer - delete reference.
             */
            if (point.getBuffer() == null) {
                purgeZeroObject(bucketId, object, point, false);
                freeSpace.addAndGet(AllocationUtils.getRequiredMemory(point.getShape()));

                elementsDropped.incrementAndGet();
                continue;
            } else {
                elementsSurvived.incrementAndGet();
            }

            //point.getAccessState().releaseToe();
        } else {
            //  log.warn("SKIPPING :(");
        }
    }



    //log.debug("Short average: ["+shortAverage+"], Long average: [" + longAverage + "]");
    //log.debug("Aggressiveness: ["+ aggressiveness+"]; Short threshold: ["+shortThreshold+"]; Long threshold: [" + longThreshold + "]");
    log.debug("Zero {} elements checked: [{}], deleted: {}, survived: {}", bucketId, totalElements,
                    elementsDropped.get(), elementsSurvived.get());

    return freeSpace.get();
}
 
源代码18 项目: L2jOrg   文件: DeadPartyPledge.java
@Override
public void forEachAffected(Creature activeChar, WorldObject target, Skill skill, Consumer<? super WorldObject> action)
{
	final IAffectObjectHandler affectObject = AffectObjectHandler.getInstance().getHandler(skill.getAffectObject());
	final int affectRange = skill.getAffectRange();
	final int affectLimit = skill.getAffectLimit();
	
	if (isPlayable(target))
	{
		final Playable playable = (Playable) target;
		final Player player = playable.getActingPlayer();
		final Party party = player.getParty();
		
		// Create the target filter.
		final AtomicInteger affected = new AtomicInteger(0);
		final Predicate<Playable> filter = plbl ->
		{
			if ((affectLimit > 0) && (affected.get() >= affectLimit))
			{
				return false;
			}
			
			final Player p = plbl.getActingPlayer();
			if ((p == null) || !p.isDead())
			{
				return false;
			}
			
			if (p != player)
			{
				if ((p.getClanId() == 0) || (p.getClanId() != player.getClanId()))
				{
					final Party targetParty = p.getParty();
					if ((party == null) || (targetParty == null) || (party.getLeaderObjectId() != targetParty.getLeaderObjectId()))
					{
						return false;
					}
				}
			}
			if ((affectObject != null) && !affectObject.checkAffectedObject(activeChar, p))
			{
				return false;
			}
			
			affected.incrementAndGet();
			return true;
		};
		
		// Affect object of origin since its skipped in the forEachVisibleObjectInRange method.
		if (filter.test(playable))
		{
			action.accept(playable);
		}
		
		// Check and add targets.
		World.getInstance().forEachVisibleObjectInRange(playable, Playable.class, affectRange, c ->
		{
			if (filter.test(c))
			{
				action.accept(c);
			}
		});
	}
}
 
源代码19 项目: grpc-nebula-java   文件: CascadingTest.java
/**
 * Create a chain of client to server calls which can be cancelled top down.
 *
 * @return a Future that completes when call chain is created
 */
private Future<?> startChainingServer(final int depthThreshold) throws IOException {
  final AtomicInteger serversReady = new AtomicInteger();
  final SettableFuture<Void> chainReady = SettableFuture.create();
  class ChainingService extends TestServiceGrpc.TestServiceImplBase {
    @Override
    public void unaryCall(final SimpleRequest request,
        final StreamObserver<SimpleResponse> responseObserver) {
      ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() {
        @Override
        public void run() {
          receivedCancellations.countDown();
        }
      });
      if (serversReady.incrementAndGet() == depthThreshold) {
        // Stop recursion
        chainReady.set(null);
        return;
      }

      Context.currentContextExecutor(otherWork).execute(new Runnable() {
        @Override
        public void run() {
          try {
            blockingStub.unaryCall(request);
          } catch (StatusRuntimeException e) {
            Status status = e.getStatus();
            if (status.getCode() == Status.Code.CANCELLED) {
              observedCancellations.countDown();
            } else {
              responseObserver.onError(e);
            }
          }
        }
      });
    }
  }

  server = InProcessServerBuilder.forName("channel").executor(otherWork)
      .addService(new ChainingService())
      .build().start();
  return chainReady;
}
 
/**
 * Adds a LocalStatListener for an individual stat. Validates that it
 * receives notifications. Removes the listener and validates that it
 * was in fact removed and no longer receives notifications.
 */
public void testLocalStatListener() throws Exception {
  connect(createGemFireProperties());

  GemFireStatSampler statSampler = getGemFireStatSampler();
  assertTrue(statSampler.waitForInitialization(5000));

  Method getLocalListeners = getGemFireStatSampler().getClass().getMethod("getLocalListeners");
  assertNotNull(getLocalListeners);

  Method addLocalStatListener = getGemFireStatSampler().getClass().getMethod("addLocalStatListener", LocalStatListener.class, Statistics.class, String.class);
  assertNotNull(addLocalStatListener);

  Method removeLocalStatListener = getGemFireStatSampler().getClass().getMethod("removeLocalStatListener", LocalStatListener.class);
  assertNotNull(removeLocalStatListener);

  // validate that there are no listeners
  assertTrue(statSampler.getLocalListeners().isEmpty());

  // add a listener for sampleCount stat in StatSampler statistics
  StatisticsType statSamplerType = getStatisticsManager().findType("StatSampler");
  Statistics[] statsArray = getStatisticsManager().findStatisticsByType(statSamplerType);
  assertEquals(1, statsArray.length);

  final Statistics statSamplerStats = statsArray[0];
  final String statName = "sampleCount";
  final AtomicInteger sampleCountValue = new AtomicInteger(0);
  final AtomicInteger sampleCountChanged = new AtomicInteger(0);

  LocalStatListener listener = new LocalStatListener() {
    public void statValueChanged(double value) {
      sampleCountValue.set((int)value);
      sampleCountChanged.incrementAndGet();
    }
  };

  statSampler.addLocalStatListener(listener, statSamplerStats, statName);
  assertTrue(statSampler.getLocalListeners().size() == 1);

  // there's a level of indirection here and some protected member fields
  LocalStatListenerImpl lsli = (LocalStatListenerImpl)
      statSampler.getLocalListeners().iterator().next();
  assertEquals("sampleCount", lsli.stat.getName());

  // wait for the listener to update 4 times
  final int expectedChanges = 4;
  WaitCriterion wc = new WaitCriterion() {
    public boolean done() {
      return sampleCountChanged.get() >= expectedChanges;
    }
    public String description() {
      return "Waiting for sampleCountChanged >= " + expectedChanges;
    }
  };
  DistributedTestCase.waitForCriterion(wc, 10000, 10, true);

  // validate that the listener fired and updated the value
  assertTrue(sampleCountValue.get() > 0);
  assertTrue(sampleCountChanged.get() >= expectedChanges);

  // remove the listener
  statSampler.removeLocalStatListener(listener);
  final int expectedSampleCountValue = sampleCountValue.get();
  final int expectedSampleCountChanged = sampleCountChanged.get();

  // validate that there are no listeners now
  assertTrue(statSampler.getLocalListeners().isEmpty());

  // wait for 2 stat samples to occur
  wc = new WaitCriterion() {
    public boolean done() {
      return statSamplerStats.getInt("sampleCount") >= expectedSampleCountValue;
    }
    public String description() {
      return "Waiting for sampleCount >= " + expectedSampleCountValue;
    }
  };
  DistributedTestCase.waitForCriterion(wc, 5000, 10, true);

  // validate that the listener did not fire
  assertEquals(expectedSampleCountValue, sampleCountValue.get());
  assertEquals(expectedSampleCountChanged, sampleCountChanged.get());
}