下面列出了java.util.concurrent.ConcurrentLinkedQueue#iterator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 打印binlog 位置日志
*
* @param logPositions
*/
private void debugLogPosition(ConcurrentLinkedQueue<LogPosition> logPositions) {
if (LogUtils.debug.isDebugEnabled()) {
Iterator<LogPosition> liter = logPositions.iterator();
boolean isHead = true;
int count = 0;
while (liter.hasNext()) {
LogPosition lp = liter.next();
if (isHead) {
LogUtils.debug.debug(host + " truncLogPosQueue logPositions head is " + lp);
isHead = false;
}
count++;
}
LogUtils.debug.debug(host + " truncLogPosQueue logPositions queue size " + count);
BlockingQueue<Object> queue = this.throttler;
if (queue != null) {
LogUtils.debug.debug(host + " throttler queue size " + queue.size());
}
}
}
/**
* 获取可用的文件保存路径
* 当所有路径文件夹单位数都超过FolderSize时,返回null
*
* @return
*/
public String getSavePath() {
ConcurrentLinkedQueue<File> availablePath = ImgUploadConfig.getAvailablePath();
Iterator<File> iterator = availablePath.iterator();
while (iterator.hasNext()) {
File file = iterator.next();
if (file.listFiles().length < imgUploadConfig.getFolderSize()) {
return file.getPath();
} else {
availablePath.remove(file);
}
}
return null;
}
/**
* @param subTaskIndex
* of test harness partition
* @param alreadySeen
* Tuples that we've already grabbed from the test harness and should therefore be
* excluded from the result
* @return All CrawlStateUrl Tuples that the <code>subTaskIndex</code> instance of UrlDBFunction
* sent to its side channel since its test harness was constructed
*/
private List<CrawlStateUrl> getStatusUpdateUrls(int subTaskIndex,
List<CrawlStateUrl> alreadySeen) {
ConcurrentLinkedQueue<StreamRecord<CrawlStateUrl>> recordQueue = _testHarnesses[subTaskIndex]
.getSideOutput(UrlDBFunction.STATUS_OUTPUT_TAG);
Iterator<StreamRecord<CrawlStateUrl>> iterator = recordQueue.iterator();
List<CrawlStateUrl> result = new ArrayList<CrawlStateUrl>();
int numAlreadySeen = alreadySeen.size();
while (iterator.hasNext()) {
CrawlStateUrl url = iterator.next().getValue();
if (numAlreadySeen-- <= 0) {
result.add(url);
}
}
return result;
}
/**
* @param subTaskIndex
* of test harness partition
* @param alreadySeen
* Tuples that we've already grabbed from the test harness and should therefore be
* excluded from the result
* @return All FetchUrl Tuples that the <code>subTaskIndex</code> instance of UrlDBFunction has
* output since its test harness was constructed
*/
@SuppressWarnings("unchecked")
private List<FetchUrl> getOutputUrls(int subTaskIndex, List<FetchUrl> alreadySeen) {
ConcurrentLinkedQueue<Object> recordQueue = _testHarnesses[subTaskIndex].getOutput();
List<FetchUrl> result = new ArrayList<FetchUrl>();
Iterator<Object> iterator = recordQueue.iterator();
int numAlreadySeen = alreadySeen.size();
while (iterator.hasNext()) {
StreamRecord<FetchUrl> streamRecord = (StreamRecord<FetchUrl>) (iterator.next());
FetchUrl url = streamRecord.getValue();
if (numAlreadySeen-- <= 0) {
result.add(url);
}
}
return result;
}
/**
* 合并commit 位置
* ----------------------| |--------------------| |--------------------|
* node1.isCommit = true | -> | node2.isCommit=true| -> |node3.isCommit=false| ...
* ----------------------| |--------------------| |--------------------|
* <p>
* then the result remove 1node leave 2node... behind
*/
private void truncLogPosQueue(ConcurrentLinkedQueue<LogPosition> logPositions) {
if (logPositions.isEmpty()) {
LogUtils.warn.warn("no binlog position object in queue");
return;
}
// 根据concurrent list 实现方式 一次size 相当于 直接遍历一遍 链表
LogPosition curr = null;
LogPosition pre = null;
LinkedList<LogPosition> rms = new LinkedList<LogPosition>();
Iterator<LogPosition> iterator = logPositions.iterator();
while (iterator.hasNext()) {
if (pre == null) {
pre = iterator.next();
continue;
}
curr = iterator.next();
if (pre.isCommit() && curr.isCommit()) {
rms.add(pre);
}
pre = curr;
}
removeQueueWithLock(logPositions, rms);
// 轻易不要开work日志
debugLogPosition(logPositions);
}
@Override
public LogPosition getLatestLogPosWithRm() {
ConcurrentLinkedQueue<LogPosition> logPositions = this.logPositions;
if (logPositions == null) {
return null;
}
// 轻易不要开work日志
debugLogPosition(logPositions);
LogPosition curr;
LogPosition pre = null;
int len = 0;
List<LogPosition> rms = new LinkedList<>();
// 避免进入无线循环当中 所以控制次数 不需要担心队列过长 因为有truncate Log position 保证
Iterator<LogPosition> iter = logPositions.iterator();
while (iter.hasNext() && (curr = iter.next()) != null
&& len++ < THROTTLE_QUEUE_SIZE) {
if (curr.isCommit()) {
rms.add(curr); // 添加到 删除队列
pre = curr;
continue;
}
break; // 如果不是commit 直接退出
}
removeQueueWithLock(logPositions, rms);
if (pre != null) {
pre.mergeOriginLogPos(originPos);
pre.refresh();
return pre;
}
return null;
}
@Override
public LogPosition getLatestLogPos() {
ConcurrentLinkedQueue<LogPosition> logPositions = this.logPositions;
if (logPositions == null) {
return null;
}
debugLogPosition(logPositions);
LogPosition curr;
LogPosition pre = null;
int len = 0;
// 避免进入无线循环当中 所以控制次数 不需要担心队列过长 因为有truncate Log position 保证
Iterator<LogPosition> iter = logPositions.iterator();
while (iter.hasNext() && (curr = iter.next()) != null
&& len++ < THROTTLE_QUEUE_SIZE) {
if (curr.isCommit()) {
pre = curr;
continue;
}
break; // 如果不是commit 直接退出
}
if (pre != null) {
LogPosition np = pre.clone();
np.mergeOriginLogPos(originPos);
np.refresh();
return np;
}
return null;
}
/**
* Removes the listener.
*
* @param event an event name.
* @param fn
* @return a reference to this object.
*/
public Emitter off(String event, Listener fn) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) {
Iterator<Listener> it = callbacks.iterator();
while (it.hasNext()) {
Listener internal = it.next();
if (Emitter.sameAs(fn, internal)) {
it.remove();
break;
}
}
}
return this;
}
/**
* iterator iterates through all elements
*/
public void testIterator() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
Iterator it = q.iterator();
int i;
for (i = 0; it.hasNext(); i++)
assertTrue(q.contains(it.next()));
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
/**
* iterator ordering is FIFO
*/
public void testIteratorOrdering() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
int k = 0;
for (Iterator it = q.iterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
}
/**
* Modifications do not cause iterators to fail
*/
public void testWeaklyConsistentIteration() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
for (Iterator it = q.iterator(); it.hasNext();) {
q.remove();
it.next();
}
assertEquals("queue should be empty again", 0, q.size());
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertSame(it.next(), two);
assertSame(it.next(), three);
assertFalse(it.hasNext());
}
public Iterator<String[]> getItems() {
ConcurrentLinkedQueue<String[]> tmp = new ConcurrentLinkedQueue<String[]>();
for (int i = 0; i < root2.getChildCount(); i++) {
TreeNode nodeChart = root2.getChildAt(i);
String chart = nodeChart.toString();
for (int j = 0; j < nodeChart.getChildCount(); j++) {
String[] item = new String[2];
item[0] = chart;
item[1] = nodeChart.getChildAt(j).toString();
tmp.add(item);
}
}
return tmp.iterator();
}
private void checkIfNeedHeartBeat(
LinkedList<BackendConnection> heartBeatCons, ConQueue queue,
ConcurrentLinkedQueue<BackendConnection> checkLis,
long hearBeatTime, long hearBeatTime2) {
int maxConsInOneCheck = 10;
Iterator<BackendConnection> checkListItor = checkLis.iterator();
while (checkListItor.hasNext()) {
BackendConnection con = checkListItor.next();
if (con.isClosedOrQuit()) {
checkListItor.remove();
continue;
}
if (validSchema(con.getSchema())) {
if (con.getLastTime() < hearBeatTime
&& heartBeatCons.size() < maxConsInOneCheck) {
if(checkLis.remove(con)) {
//如果移除成功,则放入到心跳连接中,如果移除失败,说明该连接已经被其他线程使用,忽略本次心跳检测
con.setBorrowed(true);
heartBeatCons.add(con);
}
}
} else if (con.getLastTime() < hearBeatTime2) {
// not valid schema conntion should close for idle
// exceed 2*conHeartBeatPeriod
// 同样,这里也需要先移除,避免被业务连接
if(checkLis.remove(con)) {
con.close(" heart beate idle ");
}
}
}
}
protected LinkedList<BackendConnection> getNeedHeartbeatCons(
ConcurrentLinkedQueue<BackendConnection> checkLis, long heartbeatTime, long closeTime) {
int maxConsInOneCheck = 10;
LinkedList<BackendConnection> heartbeatCons = new LinkedList<BackendConnection>();
Iterator<BackendConnection> checkListItor = checkLis.iterator();
while (checkListItor.hasNext()) {
BackendConnection con = checkListItor.next();
if ( con.isClosed() ) {
checkListItor.remove();
continue;
}
// 关闭 闲置过久的 connection
if (con.getLastTime() < closeTime) {
if(checkLis.remove(con)) {
con.close("heartbeate idle close ");
continue;
}
}
// 提取需要做心跳检测的 connection
if (con.getLastTime() < heartbeatTime && heartbeatCons.size() < maxConsInOneCheck) {
// 如果移除失败,说明该连接已经被其他线程使用
if(checkLis.remove(con)) {
con.setBorrowed(true);
heartbeatCons.add(con);
}
}
}
return heartbeatCons;
}
/**
* Removes the listener.
*
* @param event an event name.
* @param fn
* @return a reference to this object.
*/
public Emitter off(String event, Listener fn) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) {
Iterator<Listener> it = callbacks.iterator();
while (it.hasNext()) {
Listener internal = it.next();
if (Emitter.sameAs(fn, internal)) {
it.remove();
break;
}
}
}
return this;
}
/**
* iterator iterates through all elements
*/
public void testIterator() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
Iterator it = q.iterator();
int i;
for (i = 0; it.hasNext(); i++)
assertTrue(q.contains(it.next()));
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
/**
* iterator ordering is FIFO
*/
public void testIteratorOrdering() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
int k = 0;
for (Iterator it = q.iterator(); it.hasNext();) {
assertEquals(++k, it.next());
}
assertEquals(3, k);
}
/**
* Modifications do not cause iterators to fail
*/
public void testWeaklyConsistentIteration() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
for (Iterator it = q.iterator(); it.hasNext();) {
q.remove();
it.next();
}
assertEquals("queue should be empty again", 0, q.size());
}
/**
* iterator.remove removes current element
*/
public void testIteratorRemove() {
final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.add(one);
q.add(two);
q.add(three);
Iterator it = q.iterator();
it.next();
it.remove();
it = q.iterator();
assertSame(it.next(), two);
assertSame(it.next(), three);
assertFalse(it.hasNext());
}