下面列出了java.util.concurrent.BlockingQueue#isEmpty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testTermination() throws Exception {
final Configuration configuration = new Configuration();
final AbstractMonitoringWriter writer = new DumpWriter(configuration);
final BlockingQueue<IMonitoringRecord> writerQueue = new LinkedBlockingQueue<IMonitoringRecord>();
writerQueue.add(new EmptyRecord());
final MonitoringWriterThread thread = new MonitoringWriterThread(writer, writerQueue);
thread.start();
while (!writerQueue.isEmpty()) {
Thread.yield();
}
// thread terminates before the timeout has been reached, i.e.,
// it correctly writes out the EmptyRecord from the writerQueue
thread.terminate();
thread.join(THREAD_STATE_CHANGE_TIMEOUT_IN_MS);
Assert.assertThat(thread.getState(), CoreMatchers.is(State.TERMINATED));
}
public void execute() throws Exception {
ExecutorService[] executorServices = Config.resource_node_processPlatformExecutors();
List<String> list = new ArrayList<>();
for (int i = 0; i < executorServices.length; i++) {
ExecutorService service = executorServices[i];
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
BlockingQueue<Runnable> queue = executor.getQueue();
list.add(String.format("processPlatform executorServices[%d] completed:%d, block:%d.", i,
executor.getCompletedTaskCount(), queue.size()));
if (!queue.isEmpty()) {
List<String> os = new ArrayList<>();
for (Runnable o : queue) {
os.add(o.getClass().toString());
}
list.add(" +++ blocking: " + StringUtils.join(os, ",") + ".");
}
}
System.out.println(StringUtils.join(list, StringUtils.LF));
}
@Override
public StreamsResultSet readCurrent() {
BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
int batchCount = 0;
while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
if (datum != null) {
++batchCount;
ComponentUtils.offerUntilSuccess(datum, batch);
}
}
boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated();
this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
this.previousPullWasEmpty = pullIsEmpty;
return new StreamsResultSet(batch);
}
@Override
public void release() {
ConcurrentMap<Integer, BlockingQueue<GatewaySenderEventImpl>> tmpQueueMap =
this.bucketToTempQueueMap;
if(tmpQueueMap != null) {
Iterator<BlockingQueue<GatewaySenderEventImpl>> iter = tmpQueueMap.values().iterator();
while(iter.hasNext()) {
BlockingQueue<GatewaySenderEventImpl> queue =iter.next();
while(!queue.isEmpty()) {
GatewaySenderEventImpl event = queue.remove();
event.release();
}
}
}
}
private void waitForExpectedOperations(int expectedOperations, BlockingQueue<SyslogServerEventIF> queue) throws InterruptedException {
int operations = 0;
int openClose = 0;
long endTime = System.currentTimeMillis() + TimeoutUtil.adjust(5000);
do {
if (queue.isEmpty()) {
Thread.sleep(100);
}
while (!queue.isEmpty()) {
SyslogServerEventIF event = queue.take();
char[] messageChars = event.getMessage().toCharArray();
for (char character : messageChars) {
if (character == '{' || character == '}') {
if (character == '{') {
openClose++;
} else {
openClose--;
}
Assert.assertTrue(openClose >= 0);
if (openClose == 0) operations++;
}
}
}
if (operations >= expectedOperations) {
break;
}
} while (System.currentTimeMillis() < endTime);
Assert.assertEquals(expectedOperations, operations);
}
private static void waitForFinish(BlockingQueue<DBObject> queue) {
while (!queue.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
/**
* 在各个队列里清除超时的请求,并返回给客户端系统繁忙
* @param blockingQueue 队列
* @param maxWaitTimeMillsInQueue 超时
*/
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
void sendLogs(BlockingQueue<JsonObject> logs) {
ChannelHandlerContext c = ctx;
if (c == null || logs.isEmpty() || !connected) {
return;
}
JsonArray lgs = new JsonArray();
for (int i = 0; i < 1024; ++i) {
JsonObject next = logs.poll();
if (next == null) {
break;
}
lgs.add(next);
}
try {
String spayload = JSON.toJson(lgs);
byte[] payload = spayload.getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = c.alloc().ioBuffer();
ByteBufOutputStream out = new ByteBufOutputStream(buffer);
hubSerializer.serialize(HubMessage.createLog(payload), out);
IOUtils.closeQuietly(out);
BinaryWebSocketFrame frame = new BinaryWebSocketFrame(buffer);
c.writeAndFlush(frame);
lastHubMsg = System.nanoTime();
} catch (IOException ex) {
log.warn("log serialization failed, dropping message", ex);
}
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
/**
* Handle bulk item saving.
*
* @param itemSavingQueue the queue that's used for saving items
*/
public void performItemSaving(BlockingQueue<Item> itemSavingQueue) {
if (itemSavingQueue.isEmpty()) {
return;
}
List<Item> itemList = new ArrayList<>();
itemSavingQueue.drainTo(itemList);
ItemDao.updateItems(itemList);
}
private static List<ProcessResourceConsumptionEvent> pollEvents(
BlockingQueue<ProcessResourceConsumptionEvent> events) throws Exception {
List<ProcessResourceConsumptionEvent> res = new ArrayList<>();
while (!events.isEmpty()) {
ProcessResourceConsumptionEvent event = events.poll(0, TimeUnit.MILLISECONDS);
res.add(event);
}
return res;
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
@Test
public void testPut() throws InterruptedException {
final int cap = 10;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
new Thread(){
@Override
public void run() {
try {
sleep(1000);
// after a second remove one
dbq.poll();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
// in main thread add one
// this operation must wait for thread
dbq.put(Integer.valueOf(cap));
boolean hasValCap = false;
while(!dbq.isEmpty()) {
if(dbq.poll().equals(Integer.valueOf(cap)))
hasValCap = true;
}
Assert.assertTrue(hasValCap);
}
@Ignore // this test flickers in @ParallelRunner
public void testTimeOffer() throws InterruptedException {
final int cap = 16;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
new Thread(){
@Override
public void run() {
try {
sleep(1000);
// after a second remove one
dbq.poll();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
// expect to fail for only 50 ms
Assert.assertFalse(dbq.offer(Integer.valueOf(cap), 50, TimeUnit.MILLISECONDS));
Assert.assertTrue(dbq.offer(Integer.valueOf(cap), 1550, TimeUnit.MILLISECONDS));
boolean hasValCap = false;
while(!dbq.isEmpty()) {
if(dbq.poll().equals(Integer.valueOf(cap)))
hasValCap = true;
}
Assert.assertTrue(hasValCap);
}
/**
* Closes the pool and all disconnects all idle connections
* Active connections will be closed upon the {@link java.sql.Connection#close close} method is called
* on the underlying connection instead of being returned to the pool
* @param force - true to even close the active connections
*/
protected void close(boolean force) {
//are we already closed
if (this.closed) return;
//prevent other threads from entering
this.closed = true;
//stop background thread
if (poolCleaner!=null) {
poolCleaner.stopRunning();
}
/* release all idle connections */
BlockingQueue<PooledConnection> pool = (!idle.isEmpty())?idle:(force?busy:idle);
while (!pool.isEmpty()) {
try {
//retrieve the next connection
PooledConnection con = pool.poll(1000, TimeUnit.MILLISECONDS);
//close it and retrieve the next one, if one is available
while (con != null) {
//close the connection
if (pool==idle)
release(con);
else
abandon(con);
if (!pool.isEmpty()) {
con = pool.poll(1000, TimeUnit.MILLISECONDS);
} else {
break;
}
} //while
} catch (InterruptedException ex) {
if (getPoolProperties().getPropagateInterruptState()) {
Thread.currentThread().interrupt();
}
}
if (pool.isEmpty() && force && pool!=busy) pool = busy;
}
if (this.getPoolProperties().isJmxEnabled()) this.jmxPool = null;
PoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();
for (int i=0; i<proxies.length; i++) {
try {
JdbcInterceptor interceptor = proxies[i].getInterceptorClass().getConstructor().newInstance();
interceptor.setProperties(proxies[i].getProperties());
interceptor.poolClosed(this);
}catch (Exception x) {
log.debug("Unable to inform interceptor of pool closure.",x);
}
}
}
@Override
protected void releaseAcceptedMessages() {
for (Map.Entry<BSBundle, BlockingQueue<IChunkMessage<IBindingSet>>> e : operatorQueues
.entrySet()) {
final BlockingQueue<IChunkMessage<IBindingSet>> queue = e.getValue();
if (queue.isEmpty())
continue;
final LinkedList<IChunkMessage<IBindingSet>> c = new LinkedList<IChunkMessage<IBindingSet>>();
queue.drainTo(c);
for (IChunkMessage<IBindingSet> msg : c) {
msg.release();
}
}
}
private BlockingQueue<T> getNamedOrCommonQueue(String name) {
BlockingQueue<T> result = getQueue(name);
return result.isEmpty() ? commonQueue : result;
}
public void Do(){
stdout.println("~~~~~~~~~~~~~Start Search Domain~~~~~~~~~~~~~");
BlockingQueue<IHttpRequestResponse> inputQueue = new LinkedBlockingQueue<IHttpRequestResponse>();//use to store messageInfo
BlockingQueue<String> subDomainQueue = new LinkedBlockingQueue<String>();
BlockingQueue<String> similarDomainQueue = new LinkedBlockingQueue<String>();
BlockingQueue<String> relatedDomainQueue = new LinkedBlockingQueue<String>();
inputQueue.addAll(messages);
plist = new ArrayList<DomainProducer>();
for (int i=0;i<=10;i++) {
DomainProducer p = new DomainProducer(inputQueue,subDomainQueue,
similarDomainQueue,relatedDomainQueue,i);
p.start();
plist.add(p);
}
while(true) {//to wait all threads exit.
if (inputQueue.isEmpty() && isAllProductorFinished()) {
stdout.println("~~~~~~~~~~~~~Search Domain Done~~~~~~~~~~~~~");
break;
}else {
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
}
int oldnumber = GUI.getDomainResult().getSubDomainSet().size();
GUI.getDomainResult().getSubDomainSet().addAll(subDomainQueue);
GUI.getDomainResult().getSimilarDomainSet().addAll(similarDomainQueue);
GUI.getDomainResult().getRelatedDomainSet().addAll(relatedDomainQueue);
int newnumber = GUI.getDomainResult().getSubDomainSet().size();
stdout.println(String.format("~~~~~~~~~~~~~%s subdomains added!~~~~~~~~~~~~~",newnumber-oldnumber));
return;
}
private void populateQueuesAsync(BlockingQueue<MARKStarNode> queue,
int maxNodes,
int maxMinimizations){
List<MARKStarNode> leftoverLeaves = new ArrayList<>();
List<MARKStarNode> internalNodes = state.internalNodes;
List<MARKStarNode> leafNodes = state.leafNodes;
while(!queue.isEmpty() && (internalNodes.size() < maxNodes || leafNodes.size() < maxMinimizations)){
MARKStarNode curNode = queue.poll();
Node node = curNode.getConfSearchNode();
ConfIndex index = new ConfIndex(RCs.getNumPos());
node.index(index);
double correctgscore = correctionMatrix.confE(node.assignments);
double hscore = node.getConfLowerBound() - node.gscore;
double confCorrection = Math.min(correctgscore, node.rigidScore) + hscore;
if(!node.isMinimized() && node.getConfLowerBound() - confCorrection > 1e-5) {
recordCorrection(node.getConfLowerBound(), correctgscore - node.gscore);
node.gscore = correctgscore;
if (confCorrection > node.rigidScore) {
System.out.println("Overcorrected"+SimpleConfSpace.formatConfRCs(node.assignments)+": " + confCorrection + " > " + node.rigidScore);
node.gscore = node.rigidScore;
confCorrection = node.rigidScore + hscore;
}
node.setBoundsFromConfLowerAndUpper(confCorrection, node.getConfUpperBound());
curNode.markUpdated();
leftoverLeaves.add(curNode);
continue;
}
BigDecimal diff = curNode.getUpperBound().subtract(curNode.getLowerBound());
if (node.getLevel() < RCs.getNumPos() && internalNodes.size() < maxNodes) {
if(internalNodes.size() < maxNodes) {
internalNodes.add(curNode);
state.internalZ = state.internalZ.add(diff);
}
else leftoverLeaves.add(curNode);
}
else if(shouldMinimize(node) && !correctedNode(leftoverLeaves, curNode, node)) {
if(leafNodes.size() < maxMinimizations) {
leafNodes.add(curNode);
state.leafZ = state.leafZ.add(diff);
}
else
leftoverLeaves.add(curNode);
}
}
queue.addAll(leftoverLeaves);
}