下面列出了java.util.concurrent.LinkedBlockingQueue#addAll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void collectLedgersFromDL(final URI uri,
final com.twitter.distributedlog.DistributedLogManagerFactory factory,
final Set<Long> ledgers) throws IOException {
logger.info("Enumerating {} to collect streams.", uri);
Collection<String> streams = factory.enumerateAllLogsInNamespace();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
streamQueue.addAll(streams);
logger.info("Collected {} streams from uri {} : {}",
new Object[] { streams.size(), uri, streams });
executeAction(streamQueue, 10, new Action<String>() {
@Override
public void execute(String stream) throws IOException {
collectLedgersFromStream(factory, stream, ledgers);
}
});
}
private Map<String, Long> calculateStreamSpaceUsage(
final URI uri, final com.twitter.distributedlog.DistributedLogManagerFactory factory)
throws IOException {
Collection<String> streams = factory.enumerateAllLogsInNamespace();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
streamQueue.addAll(streams);
final Map<String, Long> streamSpaceUsageMap =
new ConcurrentSkipListMap<String, Long>();
final AtomicInteger numStreamsCollected = new AtomicInteger(0);
executeAction(streamQueue, 10, new Action<String>() {
@Override
public void execute(String stream) throws IOException {
streamSpaceUsageMap.put(stream,
calculateStreamSpaceUsage(factory, stream));
if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri);
}
}
});
return streamSpaceUsageMap;
}
/**
* addAll(this) throws IllegalArgumentException
*/
public void testAddAllSelf() {
LinkedBlockingQueue q = populatedQueue(SIZE);
try {
q.addAll(q);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
/**
* addAll of a collection with any null elements throws NPE after
* possibly adding some elements
*/
public void testAddAll3() {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
ints[i] = new Integer(i);
Collection<Integer> elements = Arrays.asList(ints);
try {
q.addAll(elements);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll throws IllegalStateException if not enough room
*/
public void testAddAll4() {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE - 1);
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE; ++i)
ints[i] = new Integer(i);
Collection<Integer> elements = Arrays.asList(ints);
try {
q.addAll(elements);
shouldThrow();
} catch (IllegalStateException success) {}
}
private synchronized LinkedBlockingQueue<IAtomContainer> submitSingleThreadedJob(List<IAtomContainer> mcssList, JobType jobType, int nThreads) {
LinkedBlockingQueue<IAtomContainer> solutions = new LinkedBlockingQueue<>();
MCSSThread task = new MCSSThread(mcssList, jobType, 1);
LinkedBlockingQueue<IAtomContainer> results = task.call();
if (results != null) {
solutions.addAll(results);
}
return solutions;
}
/**
* addAll(this) throws IllegalArgumentException
*/
public void testAddAllSelf() {
LinkedBlockingQueue q = populatedQueue(SIZE);
try {
q.addAll(q);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
/**
* addAll of a collection with any null elements throws NPE after
* possibly adding some elements
*/
public void testAddAll3() {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
ints[i] = new Integer(i);
Collection<Integer> elements = Arrays.asList(ints);
try {
q.addAll(elements);
shouldThrow();
} catch (NullPointerException success) {}
}
/**
* addAll throws IllegalStateException if not enough room
*/
public void testAddAll4() {
LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE - 1);
Integer[] ints = new Integer[SIZE];
for (int i = 0; i < SIZE; ++i)
ints[i] = new Integer(i);
Collection<Integer> elements = Arrays.asList(ints);
try {
q.addAll(elements);
shouldThrow();
} catch (IllegalStateException success) {}
}
private int bkFence(final BookKeeperClient bkc, List<Long> ledgers, int fenceRate) throws Exception {
if (ledgers.isEmpty()) {
System.out.println("Nothing to fence. Done.");
return 0;
}
ExecutorService executorService = Executors.newCachedThreadPool();
final RateLimiter rateLimiter = RateLimiter.create(fenceRate);
final byte[] passwd = getConf().getBKDigestPW().getBytes(UTF_8);
final CountDownLatch latch = new CountDownLatch(ledgers.size());
final AtomicInteger numPendings = new AtomicInteger(ledgers.size());
final LinkedBlockingQueue<Long> ledgersQueue = new LinkedBlockingQueue<Long>();
ledgersQueue.addAll(ledgers);
for (int i = 0; i < concurrency; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (!ledgersQueue.isEmpty()) {
rateLimiter.acquire();
Long lid = ledgersQueue.poll();
if (null == lid) {
break;
}
System.out.println("Fencing ledger " + lid);
int numRetries = 3;
while (numRetries > 0) {
try {
LedgerHandle lh = bkc.get().openLedger(lid, BookKeeper.DigestType.CRC32, passwd);
lh.close();
System.out.println("Fenced ledger " + lid + ", " + numPendings.decrementAndGet() + " left.");
latch.countDown();
} catch (BKException.BKNoSuchLedgerExistsException bke) {
System.out.println("Skipped fence non-exist ledger " + lid + ", " + numPendings.decrementAndGet() + " left.");
latch.countDown();
} catch (BKException.BKLedgerRecoveryException lre) {
--numRetries;
continue;
} catch (Exception e) {
e.printStackTrace();
break;
}
numRetries = 0;
}
}
System.out.println("Thread exits");
}
});
}
latch.await();
SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
return 0;
}
private synchronized LinkedBlockingQueue<IAtomContainer> submitMultiThreadedJob(List<IAtomContainer> mcssList, JobType jobType, int nThreads) {
int taskNumber = 1;
LinkedBlockingQueue<IAtomContainer> solutions = new LinkedBlockingQueue<>();
LinkedBlockingQueue<Callable<LinkedBlockingQueue<IAtomContainer>>> callablesQueue = new LinkedBlockingQueue<>();
ExecutorService threadPool = newFixedThreadPool(nThreads);
int step = (int) ceil(mcssList.size() / nThreads);
if (step < 2) {
step = 2; // Can't have a step size of less than 2
}
for (int i = 0; i < mcssList.size(); i += step) {
int endPoint = i + step;
if (endPoint > mcssList.size()) {
endPoint = mcssList.size();
}
List<IAtomContainer> subList = new ArrayList<>(mcssList.subList(i, endPoint));
if (subList.size() > 1) {
MCSSThread mcssJobThread = new MCSSThread(subList, jobType, taskNumber, am ,bm);
callablesQueue.add(mcssJobThread);
taskNumber++;
} else {
solutions.add(subList.get(0));
}
}
try {
/*
* Wait all the threads to finish
*/
List<Future<LinkedBlockingQueue<IAtomContainer>>> futureList = threadPool.invokeAll(callablesQueue);
/*
* Collect the results
*/
for (Future<LinkedBlockingQueue<IAtomContainer>> callable : futureList) {
LinkedBlockingQueue<IAtomContainer> mapping = callable.get();
if (callable.isDone() && mapping != null) {
solutions.addAll(mapping);
} else {
LOGGER.warn("WARNING: InComplete job in AtomMappingTool: ");
}
}
threadPool.shutdown();
// Wait until all threads are finish
while (!threadPool.isTerminated()) {
}
gc();
} catch (InterruptedException | ExecutionException e) {
LOGGER.debug("ERROR: in AtomMappingTool: " + e.getMessage());
LOGGER.error(e);
} finally {
threadPool.shutdown();
}
return solutions;
}