下面列出了java.util.concurrent.atomic.AtomicInteger#incrementAndGet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSendReceiveTopicNonDurable() throws Throwable {
CountDownLatch latch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", latch);
AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveNonDurableSubscription");
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t.start();
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
evaluate(senderClassloader,"meshTest/sendMessages.groovy", server, sender, "sendTopic");
t.join();
Assert.assertEquals(0, errors.get());
}
@Test(groups={"Integration", "Acceptance"})
public void testExecuteRunnableWithTags() {
double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
final AtomicInteger counter = new AtomicInteger();
final CountDownLatch completionLatch = new CountDownLatch(1);
final Runnable work = new Runnable() {
@Override public void run() {
int val = counter.incrementAndGet();
if (val >= numIterations) completionLatch.countDown();
}
};
final Map<String, ?> flags = MutableMap.of("tags", ImmutableList.of("a","b"));
measure(PerformanceTestDescriptor.create()
.summary("TaskPerformanceTest.testExecuteRunnableWithTags")
.iterations(numIterations)
.minAcceptablePerSecond(minRatePerSec)
.job(new Runnable() {
@Override public void run() {
executionManager.submit(flags, work);
}})
.completionLatch(completionLatch));
}
@Override
public int update(Uri url, ContentValues initialValues, String where, String[] whereArgs) {
// NOTE: update() is never called by the front-end Settings API, and updates that
// wind up affecting rows in Secure that are globally shared will not have the
// intended effect (the update will be invisible to the rest of the system).
// This should have no practical effect, since writes to the Secure db can only
// be done by system code, and that code should be using the correct API up front.
int callingUser = UserHandle.getCallingUserId();
if (LOCAL_LOGV) Slog.v(TAG, "update() for user " + callingUser);
SqlArguments args = new SqlArguments(url, where, whereArgs);
if (TABLE_FAVORITES.equals(args.table)) {
return 0;
} else if (TABLE_GLOBAL.equals(args.table)) {
callingUser = UserHandle.USER_OWNER;
}
checkWritePermissions(args);
checkUserRestrictions(initialValues.getAsString(Settings.Secure.NAME), callingUser);
final AtomicInteger mutationCount;
synchronized (this) {
mutationCount = sKnownMutationsInFlight.get(callingUser);
}
if (mutationCount != null) {
mutationCount.incrementAndGet();
}
DatabaseHelper dbH = getOrEstablishDatabase(callingUser);
SQLiteDatabase db = dbH.getWritableDatabase();
int count = db.update(args.table, initialValues, args.where, args.args);
if (mutationCount != null) {
mutationCount.decrementAndGet();
}
if (count > 0) {
invalidateCache(callingUser, args.table); // before we notify
sendNotify(url, callingUser);
}
startAsyncCachePopulation(callingUser);
if (LOCAL_LOGV) Log.v(TAG, args.table + ": " + count + " row(s) <- " + initialValues);
return count;
}
private static void execute(Queue<RunnableWrapper> tasks, AtomicInteger taskCount) {
for (Iterator<RunnableWrapper> i = tasks.iterator(); i.hasNext();) {
final Runnable task = i.next();
i.remove();
taskCount.incrementAndGet();
task.run();
}
}
@Test
public void listenerReceivesEvents() {
Element e = ElementFactory.createDiv();
AtomicInteger listenerCalls = new AtomicInteger(0);
DomEventListener myListener = event -> listenerCalls.incrementAndGet();
e.addEventListener("click", myListener);
Assert.assertEquals(0, listenerCalls.get());
e.getNode().getFeature(ElementListenerMap.class)
.fireEvent(new DomEvent(e, "click", Json.createObject()));
Assert.assertEquals(1, listenerCalls.get());
}
protected int consumeMessages(int count, final String key, int timeout) {
final AtomicInteger cc = new AtomicInteger(0);
for (Message message : messages) {
String body = new String(message.getBody());
if (body.contains(key)) {
cc.incrementAndGet();
}
}
return cc.get();
}
/**
* Process one inputstream and send all documents to feedclient.
*
* @param inputStream source of array of json document.
* @param feedClient where data is sent.
* @param numSent counter to be incremented for every document streamed.
*/
public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
try (InputStreamJsonElementBuffer jsonElementBuffer = new InputStreamJsonElementBuffer(inputStream)) {
JsonFactory jfactory = new JsonFactory().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES);
JsonParser jParser = jfactory.createParser(jsonElementBuffer);
while (true) {
int documentStart = (int) jParser.getCurrentLocation().getCharOffset();
String docId = parseOneDocument(jParser);
if (docId == null) {
int documentEnd = (int) jParser.getCurrentLocation().getCharOffset();
int documentLength = documentEnd - documentStart;
int maxTruncatedLength = 500;
StringBuilder stringBuilder = new StringBuilder(maxTruncatedLength + 3);
for (int i = 0; i < Math.min(documentLength, maxTruncatedLength); i++)
stringBuilder.append(jsonElementBuffer.circular.get(documentStart + i));
if (documentLength > maxTruncatedLength)
stringBuilder.append("...");
throw new IllegalArgumentException("Document is missing ID: '" + stringBuilder.toString() + "'");
}
CharSequence data = jsonElementBuffer.getJsonAsArray(jParser.getCurrentLocation().getCharOffset());
feedClient.stream(docId, data);
numSent.incrementAndGet();
}
} catch (EOFException ignored) {
// No more documents
} catch (IOException ioe) {
System.err.println(ioe.getMessage());
throw new UncheckedIOException(ioe);
}
}
@Test
void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
TaskId id = TaskId.generateTaskId();
AtomicInteger counter = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
Task inProgressTask = new MemoryReferenceTask(() -> {
await(latch);
counter.incrementAndGet();
return Task.Result.COMPLETED;
});
TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
resultMono.subscribe();
Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
.untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
worker.cancelTask(id);
resultMono.block(Duration.ofSeconds(10));
// Due to the use of signals, cancellation cannot be instantaneous
// Let a grace period for the cancellation to complete to increase test stability
Thread.sleep(50);
verify(listener, atLeastOnce()).cancelled(id, Optional.empty());
verifyNoMoreInteractions(listener);
}
@Test
public void perCallShouldInstantiateMultipleInstances() throws Exception {
AtomicInteger closeCount = new AtomicInteger(0);
class TestService extends GreeterGrpc.GreeterImplBase implements AutoCloseable {
public TestService() {}
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onNext(HelloResponse.newBuilder().setMessage(Integer.toString(System.identityHashCode(this))).build());
responseObserver.onCompleted();
}
@Override
public void close() {
closeCount.incrementAndGet();
}
}
serverRule.getServiceRegistry().addService(new PerCallService<TestService>(() -> new TestService()));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
String oid1 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();
String oid2 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();
String oid3 = stub.sayHello(HelloRequest.getDefaultInstance()).getMessage();
assertThat(oid1).isNotEqualTo(oid2);
assertThat(oid1).isNotEqualTo(oid3);
assertThat(oid2).isNotEqualTo(oid3);
// let the threads catch up :(
Thread.sleep(100);
assertThat(closeCount.get()).isEqualTo(3);
}
@Test
public void testScheduledTaskCancelEnding() throws Exception {
Duration PERIOD = Duration.millis(20);
BasicExecutionManager m = new BasicExecutionManager("mycontextid");
final AtomicInteger i = new AtomicInteger();
ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
@Override
public Task<?> call() throws Exception {
return new BasicTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() {
log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
i.incrementAndGet();
if (i.get() >= 5) submitter.cancel();
return i.get();
}});
}});
log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
m.submit(t);
log.info("submitted {} {}", t, t.getStatusDetail(false));
Integer interimResult = (Integer) t.get();
log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
assertTrue(i.get() > 0);
t.blockUntilEnded();
// int finalResult = t.get()
log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
// assertEquals(finalResult, 5)
assertEquals(i.get(), 5);
}
@Override
public int delete(Uri url, String where, String[] whereArgs) {
int callingUser = UserHandle.getCallingUserId();
if (LOCAL_LOGV) Slog.v(TAG, "delete() for user " + callingUser);
SqlArguments args = new SqlArguments(url, where, whereArgs);
if (TABLE_FAVORITES.equals(args.table)) {
return 0;
} else if (TABLE_OLD_FAVORITES.equals(args.table)) {
args.table = TABLE_FAVORITES;
} else if (TABLE_GLOBAL.equals(args.table)) {
callingUser = UserHandle.USER_OWNER;
}
checkWritePermissions(args);
final AtomicInteger mutationCount;
synchronized (this) {
mutationCount = sKnownMutationsInFlight.get(callingUser);
}
if (mutationCount != null) {
mutationCount.incrementAndGet();
}
DatabaseHelper dbH = getOrEstablishDatabase(callingUser);
SQLiteDatabase db = dbH.getWritableDatabase();
int count = db.delete(args.table, args.where, args.args);
if (mutationCount != null) {
mutationCount.decrementAndGet();
}
if (count > 0) {
invalidateCache(callingUser, args.table); // before we notify
sendNotify(url, callingUser);
}
startAsyncCachePopulation(callingUser);
if (LOCAL_LOGV) Log.v(TAG, args.table + ": " + count + " row(s) deleted");
return count;
}
@Test
public void testCanResumeOnCompletion() throws Exception {
final int max = 6;
final Observable<Integer> ints = Observable.range(1, max);
final int repeat = 5;
final AtomicInteger retries = new AtomicInteger();
Operator<Integer, Integer> resumeOperator = new OperatorResumeOnCompleted<>(
new ResumeOnCompletedPolicy<Integer>() {
@Override
public Observable<Integer> call(final Integer attempts) {
if (attempts > repeat) {
return null;
}
retries.incrementAndGet();
return Observable.just(attempts + max);
}
});
final CountDownLatch done = new CountDownLatch(1);
final AtomicInteger completionCount = new AtomicInteger();
final List<Integer> collected = new ArrayList<>();
ints
.lift(resumeOperator)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
completionCount.incrementAndGet();
done.countDown();
}
@Override
public void onError(Throwable e) {
fail("There should be no error at all");
done.countDown();
}
@Override
public void onNext(Integer integer) {
collected.add(integer);
}
});
long timeoutSecs = 5;
if (!done.await(5, TimeUnit.SECONDS)) {
fail("Should finish within " + timeoutSecs + " seconds");
}
assertEquals(String.format("There should be exactly %d retries", repeat), repeat, retries.get());
assertEquals("There should be exactly one onCompleted call", 1, completionCount.get());
List<Integer> expected = Observable.range(1, max + repeat).toList().toBlocking().first();
assertEquals("The collected should include the original stream plus every attempt", expected, collected);
}
protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
AtomicInteger total = new AtomicInteger();
Iterator<ProcessorExchangePair> it = pairs.iterator();
while (it.hasNext()) {
ProcessorExchangePair pair = it.next();
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.get(), pairs, it);
boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
}
// the remainder of the multicast will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
}
// Decide whether to continue with the multicast or not; similar logic to the Pipeline
// remember to test for stop on exception and aggregate before copying back results
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
if (stopOnException && !continueProcessing) {
if (subExchange.getException() != null) {
// wrap in exception to explain where it failed
CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
subExchange.setException(cause);
}
// we want to stop on exception, and the exception was handled by the error handler
// this is similar to what the pipeline does, so we should do the same to not surprise end users
// so we should set the failed exchange as the result and be done
result.set(subExchange);
return true;
}
LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
if (parallelAggregate) {
doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
} else {
doAggregate(getAggregationStrategy(subExchange), result, subExchange);
}
total.incrementAndGet();
}
LOG.debug("Done sequential processing {} exchanges", total);
return true;
}
/**
*
*/
@Test
public void testLru1() {
lruStripes = 1;
mem = 10;
final AtomicInteger evictCnt = new AtomicInteger();
evictLsnr = new GridOffHeapEvictListener() {
@Override public void onEvict(int part, int hash, byte[] k, byte[] v) {
String key = new String(k);
info("Evicted key: " + key);
evictCnt.incrementAndGet();
}
@Override public boolean removeEvicted() {
return true;
}
};
map = newMap();
for (int p = 0; p < parts; p++) {
for (int i = 0; i < 10; i++) {
String key = string();
byte[] keyBytes = key.getBytes();
byte[] valBytes = bytes(100);
map.insert(p, hash(key), keyBytes, valBytes);
info("Evicted: " + evictCnt);
assertEquals(1, evictCnt.get());
assertEquals(0, map.size());
assertTrue(evictCnt.compareAndSet(1, 0));
}
}
}
@SuppressWarnings("ConstantConditions")
private boolean shouldLog(final String msg) {
AtomicInteger counter = messageCountCache.get(msg, i -> new AtomicInteger(0));
return counter.incrementAndGet() <= maxRepetitions;
}
@Ignore
public void testAddMultiThreadedLessDocsSuccess() throws Exception {
System.out.println("In testAddMultiThreadedLessDocsSuccess method");
final String query1 = "fn:count(fn:doc())";
final AtomicInteger count = new AtomicInteger(0);
ihbMT = dmManager.newWriteBatcher();
ihbMT.withBatchSize(99);
ihbMT.withThreadCount(10);
// ihbMT.withTransactionSize(3);
ihbMT.onBatchSuccess(batch -> {
}).onBatchFailure((batch, throwable) -> {
throwable.printStackTrace();
});
dmManager.startJob(ihbMT);
class MyRunnable implements Runnable {
@Override
public void run() {
for (int j = 0; j < 15; j++) {
String uri = "/local/json-" + j + "-" + Thread.currentThread().getId();
System.out.println("Thread name: " + Thread.currentThread().getName() + " URI:" + uri);
ihbMT.add(uri, fileHandle);
}
ihbMT.flushAndWait();
}
}
class CountRunnable implements Runnable {
@Override
public void run() {
try {
Thread.currentThread().sleep(15000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Set<Thread> threads = Thread.getAllStackTraces().keySet();
Iterator<Thread> iter = threads.iterator();
while (iter.hasNext()) {
Thread t = iter.next();
if (t.getName().contains("pool-1-thread-"))
System.out.println(t.getName());
count.incrementAndGet();
}
}
}
Thread countT;
countT = new Thread(new CountRunnable());
Thread t1, t2, t3;
t1 = new Thread(new MyRunnable());
t2 = new Thread(new MyRunnable());
t3 = new Thread(new MyRunnable());
countT.start();
t1.start();
t2.start();
t3.start();
countT.join();
t1.join();
t2.join();
t3.join();
// Assert.assertTrue(count.intValue()==10);
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 45);
clearDB(port);
}
/**
* This method seeks for unused zero-copy memory allocations
*
* @param bucketId Id of the bucket, serving allocations
* @return size of memory that was deallocated
*/
protected synchronized long seekUnusedZero(Long bucketId, Aggressiveness aggressiveness) {
AtomicLong freeSpace = new AtomicLong(0);
int totalElements = (int) memoryHandler.getAllocatedHostObjects(bucketId);
// these 2 variables will contain jvm-wise memory access frequencies
float shortAverage = zeroShort.getAverage();
float longAverage = zeroLong.getAverage();
// threshold is calculated based on agressiveness specified via configuration
float shortThreshold = shortAverage / (Aggressiveness.values().length - aggressiveness.ordinal());
float longThreshold = longAverage / (Aggressiveness.values().length - aggressiveness.ordinal());
// simple counter for dereferenced objects
AtomicInteger elementsDropped = new AtomicInteger(0);
AtomicInteger elementsSurvived = new AtomicInteger(0);
for (Long object : memoryHandler.getHostTrackingPoints(bucketId)) {
AllocationPoint point = getAllocationPoint(object);
// point can be null, if memory was promoted to device and was deleted there
if (point == null)
continue;
if (point.getAllocationStatus() == AllocationStatus.HOST) {
//point.getAccessState().isToeAvailable()
//point.getAccessState().requestToe();
/*
Check if memory points to non-existant buffer, using externals.
If externals don't have specified buffer - delete reference.
*/
if (point.getBuffer() == null) {
purgeZeroObject(bucketId, object, point, false);
freeSpace.addAndGet(AllocationUtils.getRequiredMemory(point.getShape()));
elementsDropped.incrementAndGet();
continue;
} else {
elementsSurvived.incrementAndGet();
}
//point.getAccessState().releaseToe();
} else {
// log.warn("SKIPPING :(");
}
}
//log.debug("Short average: ["+shortAverage+"], Long average: [" + longAverage + "]");
//log.debug("Aggressiveness: ["+ aggressiveness+"]; Short threshold: ["+shortThreshold+"]; Long threshold: [" + longThreshold + "]");
log.debug("Zero {} elements checked: [{}], deleted: {}, survived: {}", bucketId, totalElements,
elementsDropped.get(), elementsSurvived.get());
return freeSpace.get();
}
@Override
public void forEachAffected(Creature activeChar, WorldObject target, Skill skill, Consumer<? super WorldObject> action)
{
final IAffectObjectHandler affectObject = AffectObjectHandler.getInstance().getHandler(skill.getAffectObject());
final int affectRange = skill.getAffectRange();
final int affectLimit = skill.getAffectLimit();
if (isPlayable(target))
{
final Playable playable = (Playable) target;
final Player player = playable.getActingPlayer();
final Party party = player.getParty();
// Create the target filter.
final AtomicInteger affected = new AtomicInteger(0);
final Predicate<Playable> filter = plbl ->
{
if ((affectLimit > 0) && (affected.get() >= affectLimit))
{
return false;
}
final Player p = plbl.getActingPlayer();
if ((p == null) || !p.isDead())
{
return false;
}
if (p != player)
{
if ((p.getClanId() == 0) || (p.getClanId() != player.getClanId()))
{
final Party targetParty = p.getParty();
if ((party == null) || (targetParty == null) || (party.getLeaderObjectId() != targetParty.getLeaderObjectId()))
{
return false;
}
}
}
if ((affectObject != null) && !affectObject.checkAffectedObject(activeChar, p))
{
return false;
}
affected.incrementAndGet();
return true;
};
// Affect object of origin since its skipped in the forEachVisibleObjectInRange method.
if (filter.test(playable))
{
action.accept(playable);
}
// Check and add targets.
World.getInstance().forEachVisibleObjectInRange(playable, Playable.class, affectRange, c ->
{
if (filter.test(c))
{
action.accept(c);
}
});
}
}
/**
* Create a chain of client to server calls which can be cancelled top down.
*
* @return a Future that completes when call chain is created
*/
private Future<?> startChainingServer(final int depthThreshold) throws IOException {
final AtomicInteger serversReady = new AtomicInteger();
final SettableFuture<Void> chainReady = SettableFuture.create();
class ChainingService extends TestServiceGrpc.TestServiceImplBase {
@Override
public void unaryCall(final SimpleRequest request,
final StreamObserver<SimpleResponse> responseObserver) {
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() {
@Override
public void run() {
receivedCancellations.countDown();
}
});
if (serversReady.incrementAndGet() == depthThreshold) {
// Stop recursion
chainReady.set(null);
return;
}
Context.currentContextExecutor(otherWork).execute(new Runnable() {
@Override
public void run() {
try {
blockingStub.unaryCall(request);
} catch (StatusRuntimeException e) {
Status status = e.getStatus();
if (status.getCode() == Status.Code.CANCELLED) {
observedCancellations.countDown();
} else {
responseObserver.onError(e);
}
}
}
});
}
}
server = InProcessServerBuilder.forName("channel").executor(otherWork)
.addService(new ChainingService())
.build().start();
return chainReady;
}
/**
* Adds a LocalStatListener for an individual stat. Validates that it
* receives notifications. Removes the listener and validates that it
* was in fact removed and no longer receives notifications.
*/
public void testLocalStatListener() throws Exception {
connect(createGemFireProperties());
GemFireStatSampler statSampler = getGemFireStatSampler();
assertTrue(statSampler.waitForInitialization(5000));
Method getLocalListeners = getGemFireStatSampler().getClass().getMethod("getLocalListeners");
assertNotNull(getLocalListeners);
Method addLocalStatListener = getGemFireStatSampler().getClass().getMethod("addLocalStatListener", LocalStatListener.class, Statistics.class, String.class);
assertNotNull(addLocalStatListener);
Method removeLocalStatListener = getGemFireStatSampler().getClass().getMethod("removeLocalStatListener", LocalStatListener.class);
assertNotNull(removeLocalStatListener);
// validate that there are no listeners
assertTrue(statSampler.getLocalListeners().isEmpty());
// add a listener for sampleCount stat in StatSampler statistics
StatisticsType statSamplerType = getStatisticsManager().findType("StatSampler");
Statistics[] statsArray = getStatisticsManager().findStatisticsByType(statSamplerType);
assertEquals(1, statsArray.length);
final Statistics statSamplerStats = statsArray[0];
final String statName = "sampleCount";
final AtomicInteger sampleCountValue = new AtomicInteger(0);
final AtomicInteger sampleCountChanged = new AtomicInteger(0);
LocalStatListener listener = new LocalStatListener() {
public void statValueChanged(double value) {
sampleCountValue.set((int)value);
sampleCountChanged.incrementAndGet();
}
};
statSampler.addLocalStatListener(listener, statSamplerStats, statName);
assertTrue(statSampler.getLocalListeners().size() == 1);
// there's a level of indirection here and some protected member fields
LocalStatListenerImpl lsli = (LocalStatListenerImpl)
statSampler.getLocalListeners().iterator().next();
assertEquals("sampleCount", lsli.stat.getName());
// wait for the listener to update 4 times
final int expectedChanges = 4;
WaitCriterion wc = new WaitCriterion() {
public boolean done() {
return sampleCountChanged.get() >= expectedChanges;
}
public String description() {
return "Waiting for sampleCountChanged >= " + expectedChanges;
}
};
DistributedTestCase.waitForCriterion(wc, 10000, 10, true);
// validate that the listener fired and updated the value
assertTrue(sampleCountValue.get() > 0);
assertTrue(sampleCountChanged.get() >= expectedChanges);
// remove the listener
statSampler.removeLocalStatListener(listener);
final int expectedSampleCountValue = sampleCountValue.get();
final int expectedSampleCountChanged = sampleCountChanged.get();
// validate that there are no listeners now
assertTrue(statSampler.getLocalListeners().isEmpty());
// wait for 2 stat samples to occur
wc = new WaitCriterion() {
public boolean done() {
return statSamplerStats.getInt("sampleCount") >= expectedSampleCountValue;
}
public String description() {
return "Waiting for sampleCount >= " + expectedSampleCountValue;
}
};
DistributedTestCase.waitForCriterion(wc, 5000, 10, true);
// validate that the listener did not fire
assertEquals(expectedSampleCountValue, sampleCountValue.get());
assertEquals(expectedSampleCountChanged, sampleCountChanged.get());
}