下面列出了java.util.concurrent.BlockingQueue#addAll ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void cleanupExpiredSockets() {
for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
connection.getSocketClientProtocol(), connection.getPeer(), e);
}
terminate(connection);
} else {
connections.add(connection);
}
}
connectionQueue.addAll(connections);
}
}
private CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
CommunicationWorker[] workers = new CommunicationWorker[numThreads];
workers[0] = new CommunicationWorker(tasks);
doneSignal = new CountDownLatch(numThreads - 1);
for (int i = 1; i < numThreads; i++) {
workers[i] = new CommunicationWorker(tasks);
threads.submit(workers[i]);
}
return workers;
}
private CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
CommunicationWorker[] workers = new CommunicationWorker[numThreads];
workers[0] = new CommunicationWorker(tasks);
doneSignal = new CountDownLatch(numThreads - 1);
for (int i = 1; i < numThreads; i++) {
workers[i] = new CommunicationWorker(tasks);
threads.submit(workers[i]);
}
return workers;
}
private CommunicationWorker[] scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
CommunicationWorker[] workers = new CommunicationWorker[numThreads];
workers[0] = new CommunicationWorker(tasks);
doneSignal = new CountDownLatch(numThreads - 1);
for (int i = 1; i < numThreads; i++) {
workers[i] = new CommunicationWorker(tasks);
threads.submit(workers[i]);
}
return workers;
}
@Test
public void testRemoveAll_with_empty_Collection_returns_false_with_no_exception() {
final int cap = 8;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<>(cap);
final Set<Integer> set = new HashSet();
for(int i=0; i<cap; i++) {
set.add(i);
}
dbq.addAll(set);
Assert.assertFalse(dbq.removeAll(Collections.emptySet()));
Assert.assertEquals(cap, dbq.size());
}
private void cleanupExpiredSockets() {
for (final BlockingQueue<EndpointConnection> connectionQueue : connectionQueueMap.values()) {
final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection connection;
while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = connection.getLastTimeUsed();
if (lastUsed < System.currentTimeMillis() - idleExpirationMillis) {
try {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
connection.getSocketClientProtocol(), connection.getPeer(), e);
}
terminate(connection);
} else {
connections.add(connection);
}
}
connectionQueue.addAll(connections);
}
}
private void schedulerExecution(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
for (INodeInstance node : tasks) {
node.prepare(config);
}
doneSignal = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
threads.execute(new StreamWorker(tasks));
}
}
private void scheduleWaitFor(Map<Integer, INodeInstance> nodes) {
BlockingQueue<INodeInstance> tasks;
tasks = new ArrayBlockingQueue<>(nodes.size() * 2);
tasks.addAll(nodes.values());
int curTaskSize = tasks.size();
CommunicationWorker[] workers = new CommunicationWorker[curTaskSize];
doneSignal = new CountDownLatch(curTaskSize);
for (int i = 0; i < curTaskSize; i++) {
workers[i] = new CommunicationWorker(tasks);
threads.execute(workers[i]);
}
}
@Test
public void testAddAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertTrue(dbq.containsAll(si));
Set<Integer> ni = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
ni.add(Integer.valueOf(-i));
}
dbq.addAll(ni);
Assert.assertTrue(dbq.containsAll(si));
Assert.assertTrue(dbq.containsAll(ni));
for(int i=2*cap/10; i<2*cap; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertEquals(dbq.size(), 128);
}
@Test
public void testAddAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<Integer>(cap);
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertTrue(dbq.containsAll(si));
Set<Integer> ni = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
ni.add(Integer.valueOf(-i));
}
dbq.addAll(ni);
Assert.assertTrue(dbq.containsAll(si));
Assert.assertTrue(dbq.containsAll(ni));
for(int i=2*cap/10; i<2*cap; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertEquals(dbq.size(), 128);
}
@Test
public void testRemoveAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<Integer>(cap);
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertTrue(dbq.containsAll(si));
Set<Integer> ni = new HashSet(cap);
for(int i=1; i<cap/10; i++) {
ni.add(Integer.valueOf(-i));
}
dbq.addAll(ni);
Assert.assertTrue(dbq.containsAll(si));
Assert.assertTrue(dbq.containsAll(ni));
Assert.assertTrue(dbq.removeAll(si));
Assert.assertTrue(dbq.containsAll(ni));
Assert.assertFalse(dbq.containsAll(si));
Assert.assertTrue(dbq.removeAll(ni));
Assert.assertFalse(dbq.containsAll(ni));
Assert.assertFalse(dbq.containsAll(si));
}
@Test
public void testAddAll() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertTrue(dbq.containsAll(si));
Set<Integer> ni = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
ni.add(Integer.valueOf(-i));
}
dbq.addAll(ni);
Assert.assertTrue(dbq.containsAll(si));
Assert.assertTrue(dbq.containsAll(ni));
for(int i=2*cap/10; i<2*cap; i++) {
si.add(Integer.valueOf(i));
}
dbq.addAll(si);
Assert.assertEquals(dbq.size(), 128);
}
/**
* 向工厂中添加队列.
* @param key 队列Key
* @param queue 队列
*/
public void setQueue(final String key, final ArrayBlockingQueue<Object> queue) {
if (getQueue(key) != null) {
final BlockingQueue<Object> theQueue = getQueue(key);
theQueue.addAll(queue);
} else {
queueMap.put(key, queue);
}
}
public void addBufferedQueue(BlockingQueue<byte[]> bufferedQueue) {
bufferedQueue.addAll(bufferCache);
bufferedQueueList.add(bufferedQueue);
}
public void Do(){
stdout.println("~~~~~~~~~~~~~Start Search Domain~~~~~~~~~~~~~");
BlockingQueue<IHttpRequestResponse> inputQueue = new LinkedBlockingQueue<IHttpRequestResponse>();//use to store messageInfo
BlockingQueue<String> subDomainQueue = new LinkedBlockingQueue<String>();
BlockingQueue<String> similarDomainQueue = new LinkedBlockingQueue<String>();
BlockingQueue<String> relatedDomainQueue = new LinkedBlockingQueue<String>();
inputQueue.addAll(messages);
plist = new ArrayList<DomainProducer>();
for (int i=0;i<=10;i++) {
DomainProducer p = new DomainProducer(inputQueue,subDomainQueue,
similarDomainQueue,relatedDomainQueue,i);
p.start();
plist.add(p);
}
while(true) {//to wait all threads exit.
if (inputQueue.isEmpty() && isAllProductorFinished()) {
stdout.println("~~~~~~~~~~~~~Search Domain Done~~~~~~~~~~~~~");
break;
}else {
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
}
int oldnumber = GUI.getDomainResult().getSubDomainSet().size();
GUI.getDomainResult().getSubDomainSet().addAll(subDomainQueue);
GUI.getDomainResult().getSimilarDomainSet().addAll(similarDomainQueue);
GUI.getDomainResult().getRelatedDomainSet().addAll(relatedDomainQueue);
int newnumber = GUI.getDomainResult().getSubDomainSet().size();
stdout.println(String.format("~~~~~~~~~~~~~%s subdomains added!~~~~~~~~~~~~~",newnumber-oldnumber));
return;
}
/**
* @param finalCheck
* If the internal nodePtrTbl should be restored for a subsequent
* liveness check. If this is the final/last check, it's pointless
* to re-create the nodePtrTable.
*/
protected int check0(final ITool tool, final boolean finalCheck) throws InterruptedException, IOException {
final long startTime = System.currentTimeMillis();
// Sum up the number of nodes in all disk graphs to indicate the amount
// of work to be done by liveness checking.
long sum = 0L;
for (int i = 0; i < checker.length; i++) {
sum += checker[i].getDiskGraph().size();
}
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS, new String[] { finalCheck ? "complete" : "current",
Long.toString(sum), checker.length == 1 ? "" : checker.length + " branches of " });
// Copy the array of checkers into a concurrent-enabled queue
// that allows LiveWorker threads to easily get the next
// LiveChecker to work on. We don't really need the FIFO
// ordering of the BlockingQueue, just its support for removing
// elements concurrently.
//
// Logically the queue is the unit of work the group of LiveWorkers
// has to complete. Once the queue is empty, all work is done and
// the LiveWorker threads will terminate.
//
// An alternative implementation could partition the array of
// LiveChecker a-priori and assign one partition to each thread.
// However, that assumes the work in all partitions is evenly
// distributed, which is not necessarily true.
final BlockingQueue<ILiveChecker> queue = new ArrayBlockingQueue<ILiveChecker>(checker.length);
queue.addAll(Arrays.asList(checker));
/*
* A LiveWorker below can either complete a unit of work a) without finding a
* liveness violation, b) finds a violation, or c) fails to check because of an
* exception/error (such as going out of memory). In case an LW fails to check,
* we still wait for all other LWs to complete. A subset of the LWs might have
* found a violation. In other words, the OOM of an LW has lower precedence than
* a violation found by another LW. However, if any LW fails to check, we terminate
* model checking after all LWs completed.
*/
final int wNum = TLCGlobals.doSequentialLiveness() ? 1 : Math.min(checker.length, TLCGlobals.getNumWorkers());
final ExecutorService pool = Executors.newFixedThreadPool(wNum);
// CS is really just a container around the set of Futures returned by the pool. It saves us from
// creating a low-level array.
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(pool);
for (int i = 0; i < wNum; i++) {
completionService.submit(new LiveWorker(tool, i, wNum, this, queue, finalCheck));
}
// Wait for all LWs to complete.
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); // wait forever
// Check if any one of the LWs found a violation (ignore failures for now).
ExecutionException ee = null;
for (int i = 0; i < wNum; i++) {
try {
final Future<Boolean> future = completionService.take();
if (future.get()) {
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS_END,
TLC.convertRuntimeToHumanReadable(System.currentTimeMillis() - startTime));
return EC.TLC_TEMPORAL_PROPERTY_VIOLATED;
}
} catch (final ExecutionException e) {
// handled below!
ee = e;
}
}
// Terminate if any one of the LWs failed c)
if (ee != null) {
final Throwable cause = ee.getCause();
if (cause instanceof OutOfMemoryError) {
MP.printError(EC.SYSTEM_OUT_OF_MEMORY_LIVENESS, cause);
} else if (cause instanceof StackOverflowError) {
MP.printError(EC.SYSTEM_STACK_OVERFLOW, cause);
} else if (cause != null) {
MP.printError(EC.GENERAL, cause);
} else {
MP.printError(EC.GENERAL, ee);
}
System.exit(1);
}
// Reset after checking unless it's the final check:
if (finalCheck == false) {
for (int i = 0; i < checker.length; i++) {
checker[i].getDiskGraph().makeNodePtrTbl();
}
}
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS_END, TLC.convertRuntimeToHumanReadable(System.currentTimeMillis() - startTime));
return EC.NO_ERROR;
}
private void populateQueuesAsync(BlockingQueue<MARKStarNode> queue,
int maxNodes,
int maxMinimizations){
List<MARKStarNode> leftoverLeaves = new ArrayList<>();
List<MARKStarNode> internalNodes = state.internalNodes;
List<MARKStarNode> leafNodes = state.leafNodes;
while(!queue.isEmpty() && (internalNodes.size() < maxNodes || leafNodes.size() < maxMinimizations)){
MARKStarNode curNode = queue.poll();
Node node = curNode.getConfSearchNode();
ConfIndex index = new ConfIndex(RCs.getNumPos());
node.index(index);
double correctgscore = correctionMatrix.confE(node.assignments);
double hscore = node.getConfLowerBound() - node.gscore;
double confCorrection = Math.min(correctgscore, node.rigidScore) + hscore;
if(!node.isMinimized() && node.getConfLowerBound() - confCorrection > 1e-5) {
recordCorrection(node.getConfLowerBound(), correctgscore - node.gscore);
node.gscore = correctgscore;
if (confCorrection > node.rigidScore) {
System.out.println("Overcorrected"+SimpleConfSpace.formatConfRCs(node.assignments)+": " + confCorrection + " > " + node.rigidScore);
node.gscore = node.rigidScore;
confCorrection = node.rigidScore + hscore;
}
node.setBoundsFromConfLowerAndUpper(confCorrection, node.getConfUpperBound());
curNode.markUpdated();
leftoverLeaves.add(curNode);
continue;
}
BigDecimal diff = curNode.getUpperBound().subtract(curNode.getLowerBound());
if (node.getLevel() < RCs.getNumPos() && internalNodes.size() < maxNodes) {
if(internalNodes.size() < maxNodes) {
internalNodes.add(curNode);
state.internalZ = state.internalZ.add(diff);
}
else leftoverLeaves.add(curNode);
}
else if(shouldMinimize(node) && !correctedNode(leftoverLeaves, curNode, node)) {
if(leafNodes.size() < maxMinimizations) {
leafNodes.add(curNode);
state.leafZ = state.leafZ.add(diff);
}
else
leftoverLeaves.add(curNode);
}
}
queue.addAll(leftoverLeaves);
}