下面列出了java.util.concurrent.atomic.AtomicInteger#decrementAndGet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public CompletableFuture writeAsync(int opId, Object... args) {
Integer result;
AtomicInteger counter = counters.get(testName);
boolean shouldModify = (boolean) args[0];
switch (opId) {
case 1:
result = shouldModify ? counter.incrementAndGet() : counter.get();
break;
case 2:
result = shouldModify ? counter.decrementAndGet() : counter.get();
break;
default:
throw new SamzaException("Invalid opId: " + opId);
}
return CompletableFuture.completedFuture(result);
}
private void pollQueue(final Subscriber<? super T> child,
AtomicLong requested,
AtomicInteger bufferedCount,
AtomicBoolean onCompleteReceived,
AtomicInteger completionEmitted,
AtomicInteger wip) {
do {
drainIfPossible(child, requested, bufferedCount, onCompleteReceived, completionEmitted);
long c = wip.decrementAndGet();
if (c > 1) {
/*
* Set down to 1 and then iterate again.
* we lower it to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before existing even once we've drained
*/
wip.set(1);
// we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after
}
} while (wip.get() > 0);
}
private static <T> Consumer<T> provideObserverResult(final rx.Observer<? super T> observer,
final AtomicInteger counter,
final AtomicBoolean errorTrigger,
final boolean failOnError) {
return result -> {
if (result.isSuccess()) {
observer.onNext(result.getValue());
if (counter.decrementAndGet() == 0) {
observer.onCompleted();
}
} else {
if (failOnError) {
if (errorTrigger.compareAndSet(false, true)) {
observer.onError(result.getError());
}
counter.set(0);
} else {
if (counter.decrementAndGet() == 0) {
observer.onCompleted();
}
}
}
};
}
void fillHomeBuckets(Collection<KBucketEntry> entries) {
if(node.getNumEntriesInRoutingTable() == 0 && entries.isEmpty()) {
bootstrapping.set(BootstrapState.NONE);
return;
}
bootstrapping.set(BootstrapState.BOOTSTRAP);
final AtomicInteger taskCount = new AtomicInteger();
TaskListener bootstrapListener = t -> {
int count = taskCount.decrementAndGet();
if(count == 0) {
bootstrapping.set(BootstrapState.NONE); ;
lastBootstrap = System.currentTimeMillis();
}
// fill the remaining buckets once all bootstrap operations finished
if (count == 0 && running && node.getNumEntriesInRoutingTable() > DHTConstants.USE_BT_ROUTER_IF_LESS_THAN_X_PEERS) {
node.fillBuckets();
}
};
for(RPCServer srv : serverManager.getAllServers()) {
findNode(srv.getDerivedID(), true, true, srv, t -> {
taskCount.incrementAndGet();
t.setInfo("Bootstrap: lookup for self");
t.injectCandidates(entries);
t.addListener(bootstrapListener);
});
}
if(taskCount.get() == 0)
bootstrapping.set(BootstrapState.NONE);
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<Collection<R>> mainPromise = createPromise();
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
AtomicInteger counter = new AtomicInteger(nodes.size());
BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object result, Throwable u) {
if (u != null && !(u instanceof RedisRedirectException)) {
mainPromise.tryFailure(u);
return;
}
if (result instanceof Collection) {
synchronized (results) {
results.addAll((Collection) result);
}
} else {
synchronized (results) {
results.add((R) result);
}
}
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.trySuccess(results);
}
}
};
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.onComplete(listener);
async(true, new NodeSource(entry), codec, command, params, promise, true);
}
return mainPromise;
}
@Test
public void immediateTaskIsExecuted() throws Exception {
Scheduler serviceRB = Schedulers.newSingle("rbWork");
Scheduler.Worker r = serviceRB.createWorker();
long start = System.currentTimeMillis();
AtomicInteger latch = new AtomicInteger(1);
Consumer<String> c = ev -> {
latch.decrementAndGet();
try {
System.out.println("ev: "+ev);
Thread.sleep(1000);
}
catch(InterruptedException ie){
throw Exceptions.propagate(ie);
}
};
r.schedule(() -> c.accept("Hello World!"));
Thread.sleep(1200);
long end = System.currentTimeMillis();
serviceRB.dispose();
Assert.assertTrue("Event missed", latch.intValue() == 0);
Assert.assertTrue("Timeout too long", (end - start) >= 1000);
}
@NonNull
private static <T, R> List<T> checkLevel(R targetValue,
List<T> previousLevel,
BiFunction<R, List<T>, Boolean> predicate,
int level,
int permutationIndex,
AtomicInteger maxIterations) {
if (previousLevel.size() == 1) {
return new ArrayList<>();
}
for (int i = permutationIndex; i < previousLevel.size(); i++) {
if (maxIterations.get() <= 0) {
return new ArrayList<>();
}
List<T> newList = new ArrayList<>(previousLevel);
newList.remove(i);
if (level == 0) {
maxIterations.decrementAndGet();
// Check all permutations on this level
if (predicate.apply(targetValue, newList)) {
return newList;
}
} else {
// Test next level
var result = checkLevel(targetValue, newList, predicate, level - 1, i, maxIterations);
if (!result.isEmpty()) {
return result;
}
}
}
return new ArrayList<>();
}
protected void handle(RPromise<Void> mainPromise, AtomicInteger slots, RFuture<?> future) {
if (future.isSuccess()) {
if (slots.decrementAndGet() == 0) {
mainPromise.trySuccess(null);
}
} else {
mainPromise.tryFailure(future.cause());
}
}
@Override
public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
NamespaceName namespace = namespaceBundle.getNamespaceObject();
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = readerCaches.remove(namespace);
if (readerCompletableFuture != null) {
readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
ownedBundlesCountPerNamespace.remove(namespace);
policyCacheInitMap.remove(namespace);
policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace));
}
}
return CompletableFuture.completedFuture(null);
}
@Override
public void prefetchAll(final Iterable<? extends K> _keys, final CacheOperationCompletionListener l) {
final CacheOperationCompletionListener _listener= l != null ? l : DUMMY_LOAD_COMPLETED_LISTENER;
if (loader == null) {
_listener.onCompleted();
return;
}
Set<K> _keysToLoad = checkAllPresent(_keys);
final AtomicInteger _count = new AtomicInteger(2);
try {
for (K k : _keysToLoad) {
final K key = k;
Runnable r = new RunWithCatch(this) {
@Override
public void action() {
try {
getEntryInternal(key);
} finally {
if (_count.decrementAndGet() == 0) {
_listener.onCompleted();
}
}
}
};
try {
getPrefetchExecutor().execute(r);
_count.incrementAndGet();
} catch (RejectedExecutionException ignore) { }
}
} finally {
if (_count.addAndGet(-2) == 0) {
_listener.onCompleted();
}
}
}
public void testClusterStatePublishingWithFaultyNodeBeforeCommit() throws InterruptedException {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);
AssertingAckListener ackListener = new AssertingAckListener(nodes.size());
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build();
AtomicInteger remainingActions = new AtomicInteger(4); // number of publish actions + initial faulty nodes injection
int injectFaultAt = randomInt(remainingActions.get() - 1);
logger.info("Injecting fault at: {}", injectFaultAt);
Set<DiscoveryNode> initialFaultyNodes = remainingActions.decrementAndGet() == injectFaultAt ?
Collections.singleton(n2) : Collections.emptySet();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, initialFaultyNodes);
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (remainingActions.decrementAndGet() == injectFaultAt) {
publication.onFaultyNode(n2);
}
if (e.getKey().equals(n2) == false || randomBoolean()) {
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest(
publication.publishRequest);
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty()));
}
});
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> {
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit);
e.getValue().onResponse(TransportResponse.Empty.INSTANCE);
});
assertTrue(publication.completed);
assertTrue(publication.committed);
publication.onFaultyNode(randomFrom(n1, n3)); // has no influence
List<Tuple<DiscoveryNode, Throwable>> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS);
assertThat(errors.size(), equalTo(1));
assertThat(errors.get(0).v1(), equalTo(n2));
assertThat(errors.get(0).v2().getMessage(), containsString("faulty node"));
}
@Override
public void execute() {
checkNoCycleInExtensions();
/*
* 1. Init independant frequency
*/
logger.debug("Init INDEPENDANT_FREQUENCY for all terms");
terminology
.terms()
.forEach(t-> t.setProperty(
TermProperty.INDEPENDANT_FREQUENCY,
t.getFrequency()));
/*
* 2. Compute depths
*/
logger.debug("Computing DEPTH property for all terms");
final AtomicInteger depth = setDepths();
logger.debug("Depth of terminology is {}", depth.intValue());
/*
* 3. Score INDEPENDANT_FREQUENCY
*/
logger.debug("Computing INDEPENDANT_FREQUENCY by for all terms");
do {
terminology
.terms()
.filter(t -> t.isPropertySet(TermProperty.DEPTH))
.filter(t -> t.getDepth() == depth.intValue())
.forEach(t -> {
final int frequency = t.getIndependantFrequency();
getBases(t)
.forEach(base -> {
int baseFrequency = base.getIndependantFrequency();
baseFrequency -= frequency;
base.setProperty(TermProperty.INDEPENDANT_FREQUENCY, baseFrequency);
});
});
depth.decrementAndGet();
} while(depth.intValue() > 0);
/*
* 4. Score INDEPENDANCE
*/
logger.debug("Computing INDEPENDANCE for all terms");
terminology.terms()
.forEach(t -> {
double iFreq = (double)t.getIndependantFrequency();
int freq = t.getFrequency();
t.setProperty(TermProperty.INDEPENDANCE,
iFreq / freq);
});
}
@Override
public ResultInfo updateProgress(String taskId, String srcCid, String dstCid, int pieceNum,
PeerPieceStatus pieceStatus) {
String msg = null;
if (StringUtils.isBlank(srcCid)) {
msg = new StringBuilder().append("src cid is empty for taskId:").append(taskId).toString();
logger.error(msg);
return new ResultInfo(ResultCode.UNKNOWN_ERROR, msg, null);
}
BitSet pieceBitSet = getBitSetByCid(taskId, srcCid);
if (pieceBitSet == null) {
msg = new StringBuilder().append("peer progress not found for taskId:").append(taskId).append(",cid:")
.append(srcCid).toString();
logger.error(msg);
return new ResultInfo(ResultCode.UNKNOWN_ERROR, msg, null);
}
if (PeerPieceStatus.SUCCESS.equals(pieceStatus)) {
ResultInfo tmpResult = processPieceSuc(taskId, srcCid, pieceNum);
if (!tmpResult.successCode()) {
return tmpResult;
}
}
if (PeerPieceStatus.SEMISUC.equals(pieceStatus)) {
pieceStatus = PeerPieceStatus.SUCCESS;
}
synchronized (pieceBitSet) {
if (pieceBitSet.get(pieceNum * 8 + pieceStatus.getStatus())) {
return new ResultInfo(ResultCode.SUCCESS);
}
boolean result = updateProgressBitSet(srcCid, dstCid, pieceBitSet, pieceNum, pieceStatus);
if (!result) {
return new ResultInfo(ResultCode.SUCCESS);
}
if (PeerPieceStatus.SUCCESS.equals(pieceStatus)) {
processPeerSucInfo(srcCid, dstCid);
} else if (PeerPieceStatus.FAIL.equals(pieceStatus)) {
processPeerFailInfo(srcCid, dstCid);
}
}
if (StringUtils.isNotBlank(dstCid)) {
AtomicInteger load = progressRepo.getProducerLoad(dstCid);
if (load != null) {
int loadValue = load.decrementAndGet();
if (loadValue < 0) {
logger.warn("client load maybe illegal,taskId:{},cid:{},pieceNum:{},load:{}",
taskId, dstCid, pieceNum, loadValue);
load.incrementAndGet();
}
}
}
return new ResultInfo(ResultCode.SUCCESS);
}
<T> void indexLogs(Logger logger, Marker marker, int numberOfProducers, Supplier<T> logSupplier) throws InterruptedException {
final AtomicInteger sleepTime = new AtomicInteger(INITIAL_SLEEP_PER_THREAD);
CountDownLatch latch = new CountDownLatch(numberOfProducers);
int numberOfLogsToDeliver = numberOfLogs.get();
AtomicInteger totalCounter = new AtomicInteger();
for (int thIndex = 0; thIndex < numberOfProducers; thIndex++) {
new Thread(() -> {
for (;numberOfLogs.decrementAndGet() >= 0; totalCounter.incrementAndGet()) {
logger.info(marker, logSupplier.get());
localCounter.incrementAndGet();
try {
sleep(sleepTime.get());
} catch (InterruptedException e) {
interrupted();
}
}
latch.countDown();
}).start();
}
final int limitPerSec = getInt("smokeTest.limitPerSec", 10000);
while (latch.getCount() != 0) {
sleep(1000);
int count = localCounter.getAndSet(0);
if (count > limitPerSec && sleepTime.get() != 1) {
sleepTime.incrementAndGet();
} else if (sleepTime.get() > 1) {
sleepTime.decrementAndGet();
}
String stats = String.format(
"Sleep millis per thread: %d, Current throughput: %d; Progress: %d/%d",
sleepTime.get(),
count,
totalCounter.get(),
numberOfLogsToDeliver);
System.out.println(stats);
}
sleep(MILLIS_BEFORE_SHUTDOWN);
System.out.println("Shutting down");
LogManager.shutdown();
sleep(MILLIS_AFTER_SHUTDOWN);
}
GlobDirectoryStream(final DirectoryStream<Path> directoryStream, final Path globPath,
final DirectoryStream.Filter<Path> filter, final AtomicInteger openCounter) throws IOException {
this.openCounter = openCounter;
this.directoryStream = directoryStream;
final Iterator<Path> dirIterator = directoryStream.iterator();
iterator = new Iterator<Path>() {
Iterator<Path> it = Collections.emptyIterator();
@Override
public boolean hasNext() {
try {
while (!it.hasNext() && dirIterator.hasNext()) {
if (childDirectoryStream != null) {
childDirectoryStream.close();
childDirectoryStream = null;
openCounter.decrementAndGet();
}
Path basePath = dirIterator.next();
if (Files.isDirectory(basePath)) {
childDirectoryStream = new GlobDirectoryStream(basePath, globPath, filter, openCounter);
openCounter.incrementAndGet();
it = childDirectoryStream.iterator();
}
}
boolean hasNext = it.hasNext();
if (!hasNext) {
if (childDirectoryStream != null) {
childDirectoryStream.close();
childDirectoryStream = null;
openCounter.decrementAndGet();
}
}
return hasNext;
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Path next() {
Utils.checkState(hasNext(), "Iterator does not have more elements");
return it.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
/**
* Converts a collection of {@link ListenableFuture}'s into a single {@link ListenableFuture}
* where the result will be the first result provided from the collection.
* <p>
* If {@code ignoreErrors} is {@code false} the returned future will complete as soon as the
* first future completes, if it completes in error then the error would be returned. If
* {@code ignoreErrors} is {@code true} then the returned future will complete once a result is
* provided, or once all futures have completed in error. If all futures did complete in error
* then the last error state will be specified to the resulting {@link ListenableFuture}. This
* minor bookkeeping to ignore errors does incur a slight overhead.
* <p>
* It is expected that the first result is the only result desired, once it is found this will
* attempt to cancel all remaining futures. If you may want other results which were in
* progress, then specifying {@code interruptOnCancel} as {@code false} will mean that any
* futures which started can complete. You can then inspect the collection for done futures
* which might have a result. If there is no concern for other results, then you likely will
* want to interrupt started futures.
*
* @since 5.38
* @param <T> type of result provided in the returned future
* @param c Collection of futures to monitor for result
* @param ignoreErrors {@code false} to communicate the first completed future state, even if in error
* @param interruptOnCancel {@code true} to send a interrupt on any running futures after we have a result
* @return A future which will be provided the first result from any in the provided {@link Collection}
*/
public static <T> ListenableFuture<T> makeFirstResultFuture(Collection<? extends ListenableFuture<? extends T>> c,
boolean ignoreErrors, boolean interruptOnCancel) {
SettableListenableFuture<T> result = new SettableListenableFuture<>(false);
FutureCallback<T> callback;
if (ignoreErrors) {
AtomicInteger errorsRemaining = new AtomicInteger(c.size());
callback = new FutureCallback<T>() {
@Override
public void handleResult(T t) {
if (result.setResult(t)) {
FutureUtils.cancelIncompleteFutures(c, interruptOnCancel);
}
}
@Override
public void handleFailure(Throwable t) {
if (errorsRemaining.decrementAndGet() == 0) {
// ignore failures till we reach the last failure
result.setFailure(t);
}
}
};
} else {
callback = new FutureCallback<T>() {
@Override
public void handleResult(T t) {
if (result.setResult(t)) {
FutureUtils.cancelIncompleteFutures(c, interruptOnCancel);
}
}
@Override
public void handleFailure(Throwable t) {
if (result.setFailure(t)) {
FutureUtils.cancelIncompleteFutures(c, interruptOnCancel);
}
}
};
}
c.forEach((lf) -> lf.callback(callback));
return result;
}
@Override
public int bulkInsert(Uri uri, ContentValues[] values) {
final int callingUser = UserHandle.getCallingUserId();
if (LOCAL_LOGV) Slog.v(TAG, "bulkInsert() for user " + callingUser);
SqlArguments args = new SqlArguments(uri);
if (TABLE_FAVORITES.equals(args.table)) {
return 0;
}
checkWritePermissions(args);
SettingsCache cache = cacheForTable(callingUser, args.table);
final AtomicInteger mutationCount;
synchronized (this) {
mutationCount = sKnownMutationsInFlight.get(callingUser);
}
if (mutationCount != null) {
mutationCount.incrementAndGet();
}
DatabaseHelper dbH = getOrEstablishDatabase(
TABLE_GLOBAL.equals(args.table) ? UserHandle.USER_OWNER : callingUser);
SQLiteDatabase db = dbH.getWritableDatabase();
db.beginTransaction();
try {
int numValues = values.length;
for (int i = 0; i < numValues; i++) {
checkUserRestrictions(values[i].getAsString(Settings.Secure.NAME), callingUser);
if (db.insert(args.table, null, values[i]) < 0) return 0;
SettingsCache.populate(cache, values[i]);
if (LOCAL_LOGV) Log.v(TAG, args.table + " <- " + values[i]);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
if (mutationCount != null) {
mutationCount.decrementAndGet();
}
}
sendNotify(uri, callingUser);
return values.length;
}
private String tryGetCid(Set<String> blackSet) {
AtomicInteger load;
String cid = null;
String tmpCid;
int times = pieceContainer.size();
boolean needOffer;
while (times-- > 0) {
needOffer = true;
tmpCid = pieceContainer.peek();
if (tmpCid == null) {
pieceHitLogger.info("peek element from empty queue");
break;
}
if (progressRepo.getServiceDownInfo(tmpCid)) {
needOffer = false;
} else {
AtomicInteger errorCount = progressRepo
.getServiceErrorInfo(tmpCid);
if (errorCount != null
&& errorCount.get() >= Constants.ELIMINATION_LIMIT) {
needOffer = false;
} else {
if (blackSet == null || !blackSet.contains(tmpCid)) {
load = progressRepo.getProducerLoad(tmpCid);
if (load != null) {
if (load.incrementAndGet() <= Constants.PEER_UP_LIMIT) {
cid = tmpCid;
break;
} else {
load.decrementAndGet();
}
} else {
needOffer = false;
}
}
}
}
synchronized (pieceContainer) {
if (StringUtils.equals(pieceContainer.peek(), tmpCid)) {
if (pieceContainer.remove(tmpCid) && needOffer) {
pieceContainer.offer(tmpCid);
}
}
}
}
return cid;
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
*/
@Test
public void testConcurrentWriteOps() throws Exception {
final TestTableProcSet procSet = new TestTableProcSet(queue);
final int NUM_ITEMS = 10;
final int NUM_TABLES = 4;
final AtomicInteger opsCount = new AtomicInteger(0);
for (int i = 0; i < NUM_TABLES; ++i) {
TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
for (int j = 1; j < NUM_ITEMS; ++j) {
procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
TableProcedureInterface.TableOperationType.EDIT));
opsCount.incrementAndGet();
}
}
assertEquals(opsCount.get(), queue.size());
final Thread[] threads = new Thread[NUM_TABLES * 2];
final HashSet<TableName> concurrentTables = new HashSet<>();
final ArrayList<String> failures = new ArrayList<>();
final AtomicInteger concurrentCount = new AtomicInteger(0);
for (int i = 0; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (opsCount.get() > 0) {
try {
Procedure proc = procSet.acquire();
if (proc == null) {
queue.signalAll();
if (opsCount.get() > 0) {
continue;
}
break;
}
TableName tableId = procSet.getTableName(proc);
synchronized (concurrentTables) {
assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
}
assertTrue(opsCount.decrementAndGet() >= 0);
try {
long procId = proc.getProcId();
int concurrent = concurrentCount.incrementAndGet();
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
concurrent >= 1 && concurrent <= NUM_TABLES);
LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
Thread.sleep(2000);
concurrent = concurrentCount.decrementAndGet();
LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
} finally {
synchronized (concurrentTables) {
assertTrue(concurrentTables.remove(tableId));
}
procSet.release(proc);
}
} catch (Throwable e) {
LOG.error("Failed " + e.getMessage(), e);
synchronized (failures) {
failures.add(e.getMessage());
}
} finally {
queue.signalAll();
}
}
}
};
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
assertTrue(failures.toString(), failures.isEmpty());
assertEquals(0, opsCount.get());
assertEquals(0, queue.size());
for (int i = 1; i <= NUM_TABLES; ++i) {
final TableName table = TableName.valueOf(String.format("testtb-%04d", i));
final TestTableProcedure dummyProc = new TestTableProcedure(100, table,
TableProcedureInterface.TableOperationType.DELETE);
assertTrue("queue should be deleted, table=" + table,
queue.markTableAsDeleted(table, dummyProc));
}
}