下面列出了java.util.concurrent.ConcurrentLinkedQueue#remove ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void noTicker(String pushId) {
WebSocketInfo info = Okexv3WebSocketExchange.CONSUMERS.get(pushId);
if (null != info) {
String symbol = info.getSymbol();
ConcurrentLinkedQueue<WebSocketInfo<Ticker>> list = Okexv3WebSocketExchange.TICKERS
.getOrDefault(symbol, null);
if (null != list) {
if (list.size() <= 1) {
// 这是最后一个订阅,需要取消订阅
Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
if (null != client) {
client.noTicker();
}
}
list.remove(info);
}
Okexv3WebSocketExchange.CONSUMERS.remove(pushId);
}
}
@Override
public void run() {
for (Entry<String, Integer> e : drpcService.getIdToStart().entrySet()) {
if (TimeUtils.time_delta(e.getValue()) > REQUEST_TIMEOUT_SECS) {
String id = e.getKey();
LOG.warn("DRPC request timed out, id: {} start at {}", id, e.getValue());
ConcurrentLinkedQueue<DRPCRequest> queue = drpcService.acquireQueue(drpcService.getIdToFunction().get(id));
queue.remove(drpcService.getIdToRequest().get(id)); //remove timeout request
Semaphore s = drpcService.getIdToSem().get(id);
if (s != null) {
s.release();
}
drpcService.cleanup(id);
LOG.info("Clear request " + id);
}
}
JStormUtils.sleepMs(10);
}
@Override
public void noDepth(String pushId) {
WebSocketInfo info = Okexv3WebSocketExchange.CONSUMERS.get(pushId);
if (null != info) {
String symbol = info.getSymbol();
ConcurrentLinkedQueue<WebSocketInfo<Depth>> list = Okexv3WebSocketExchange.DEPTH
.getOrDefault(symbol, null);
if (null != list) {
if (list.size() <= 1) {
// 这是最后一个订阅,需要取消订阅
Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
if (null != client) {
client.noDepth();
}
}
list.remove(info);
}
Okexv3WebSocketExchange.CONSUMERS.remove(pushId);
}
}
@Override
public void noTrades(String pushId) {
WebSocketInfo info = Okexv3WebSocketExchange.CONSUMERS.get(pushId);
if (null != info) {
String symbol = info.getSymbol();
ConcurrentLinkedQueue<WebSocketInfo<Trades>> list = Okexv3WebSocketExchange.TRADES
.getOrDefault(symbol, null);
if (null != list) {
if (list.size() <= 1) {
// 这是最后一个订阅,需要取消订阅
Okexv3WebSocketClient client = Okexv3WebSocketExchange.CLIENTS.get(symbol);
if (null != client) {
client.noTrades();
}
}
list.remove(info);
}
Okexv3WebSocketExchange.CONSUMERS.remove(pushId);
}
}
/**
* 如果未获取到锁 可能有冲突 立马返回 等待下次执行
*
* @param logPositions binlog 位置队列 {线程安全}
* @param rms 需要被删除的 对象
*/
private void removeQueueWithLock(ConcurrentLinkedQueue<LogPosition> logPositions,
List<LogPosition> rms) {
if (lock.tryLock()) {
try {
// 删除队列 需要有锁防止写冲突 注意这里的log pos 与 work.removeLogPosition() 为不同属性
for (LogPosition lp : rms) {
logPositions.remove(lp);
keepDump();
}
} finally {
lock.unlock();
}
}
rms.clear();
}
/**
* 获取可用的文件保存路径
* 当所有路径文件夹单位数都超过FolderSize时,返回null
*
* @return
*/
public String getSavePath() {
ConcurrentLinkedQueue<File> availablePath = ImgUploadConfig.getAvailablePath();
Iterator<File> iterator = availablePath.iterator();
while (iterator.hasNext()) {
File file = iterator.next();
if (file.listFiles().length < imgUploadConfig.getFolderSize()) {
return file.getPath();
} else {
availablePath.remove(file);
}
}
return null;
}
public static void main(String[] args) {
int i = 0;
// Without bug fix, OutOfMemoryError was observed at iteration 65120
int iterations = 10 * 65120;
try {
ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
queue.add(0L);
while (i++ < iterations) {
queue.add(1L);
queue.remove(1L);
}
} catch (Error t) {
System.err.printf("failed at iteration %d/%d%n", i, iterations);
throw t;
}
}
public static void main(String[] args) {
int i = 0;
// Without bug fix, OutOfMemoryError was observed at iteration 65120
int iterations = 10 * 65120;
try {
ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
queue.add(0L);
while (i++ < iterations) {
queue.add(1L);
queue.remove(1L);
}
} catch (Error t) {
System.err.printf("failed at iteration %d/%d%n", i, iterations);
throw t;
}
}
public static void main(String[] args) {
int i = 0;
// Without bug fix, OutOfMemoryError was observed at iteration 65120
int iterations = 10 * 65120;
try {
ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
queue.add(0L);
while (i++ < iterations) {
queue.add(1L);
queue.remove(1L);
}
} catch (Error t) {
System.err.printf("failed at iteration %d/%d%n", i, iterations);
throw t;
}
}
private static Enumerable<Object> limitCollect(PipelineOptions options, BeamRelNode node) {
long id = options.getOptionsId();
ConcurrentLinkedQueue<Row> values = new ConcurrentLinkedQueue<>();
checkArgument(
options
.getRunner()
.getCanonicalName()
.equals("org.apache.beam.runners.direct.DirectRunner"),
"SELECT without INSERT is only supported in DirectRunner in SQL Shell.");
int limitCount = getLimitCount(node);
Collector.globalValues.put(id, values);
limitRun(options, node, new Collector(), values, limitCount);
Collector.globalValues.remove(id);
// remove extra retrieved values
while (values.size() > limitCount) {
values.remove();
}
return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values));
}
/**
* retainAll(c) retains only those elements of c and reports true if change
*/
public void testRetainAll() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
ConcurrentLinkedQueue p = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
boolean changed = q.retainAll(p);
if (i == 0)
assertFalse(changed);
else
assertTrue(changed);
assertTrue(q.containsAll(p));
assertEquals(SIZE - i, q.size());
p.remove();
}
}
/**
* Cleans up the {@link SnackBarItem} and the {@link Activity} it is tied to
*
* @param activity The {@link Activity} tied to the {@link SnackBarItem}
* @param snackBarItem The {@link SnackBarItem} to clean up
*/
public void disposeSnackBar(Activity activity, SnackBarItem snackBarItem) {
ConcurrentLinkedQueue<SnackBarItem> list = mQueue.get(activity);
if (list != null) {
list.remove(snackBarItem);
if (list.peek() == null) {
mQueue.remove(activity);
mIsShowingSnackBar = false;
} else if (!mIsCanceling) {
mIsShowingSnackBar = true;
list.peek().show();
}
}
}
public static void main(String[] args) {
int i = 0;
// Without bug fix, OutOfMemoryError was observed at iteration 65120
int iterations = 10 * 65120;
try {
ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
queue.add(0L);
while (i++ < iterations) {
queue.add(1L);
queue.remove(1L);
}
} catch (Error t) {
System.err.printf("failed at iteration %d/%d%n", i, iterations);
throw t;
}
}
/**
*
* @param backendServer
* @param backendChannelContext
* @return
*/
public static boolean remove(ChannelContext backendChannelContext)
{
BackendServerConf backendServer = BackendExt.getBackendServer(backendChannelContext);
ConcurrentLinkedQueue<ChannelContext> queue = queueMap.get(backendServer);
return queue.remove(backendChannelContext);
}
/**
* remove removes next element, or throws NSEE if empty
*/
public void testRemove() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.remove());
}
try {
q.remove();
shouldThrow();
} catch (NoSuchElementException success) {}
}
protected LinkedList<BackendConnection> getNeedHeartbeatCons(
ConcurrentLinkedQueue<BackendConnection> checkLis, long heartbeatTime, long closeTime) {
int maxConsInOneCheck = 10;
LinkedList<BackendConnection> heartbeatCons = new LinkedList<BackendConnection>();
Iterator<BackendConnection> checkListItor = checkLis.iterator();
while (checkListItor.hasNext()) {
BackendConnection con = checkListItor.next();
if ( con.isClosed() ) {
checkListItor.remove();
continue;
}
// 关闭 闲置过久的 connection
if (con.getLastTime() < closeTime) {
if(checkLis.remove(con)) {
con.close("heartbeate idle close ");
continue;
}
}
// 提取需要做心跳检测的 connection
if (con.getLastTime() < heartbeatTime && heartbeatCons.size() < maxConsInOneCheck) {
// 如果移除失败,说明该连接已经被其他线程使用
if(checkLis.remove(con)) {
con.setBorrowed(true);
heartbeatCons.add(con);
}
}
}
return heartbeatCons;
}
private void checkIfNeedHeartBeat(
LinkedList<BackendConnection> heartBeatCons, ConQueue queue,
ConcurrentLinkedQueue<BackendConnection> checkLis,
long hearBeatTime, long hearBeatTime2) {
int maxConsInOneCheck = 10;
Iterator<BackendConnection> checkListItor = checkLis.iterator();
while (checkListItor.hasNext()) {
BackendConnection con = checkListItor.next();
if (con.isClosedOrQuit()) {
checkListItor.remove();
continue;
}
if (validSchema(con.getSchema())) {
if (con.getLastTime() < hearBeatTime
&& heartBeatCons.size() < maxConsInOneCheck) {
if(checkLis.remove(con)) {
//如果移除成功,则放入到心跳连接中,如果移除失败,说明该连接已经被其他线程使用,忽略本次心跳检测
con.setBorrowed(true);
heartBeatCons.add(con);
}
}
} else if (con.getLastTime() < hearBeatTime2) {
// not valid schema conntion should close for idle
// exceed 2*conHeartBeatPeriod
// 同样,这里也需要先移除,避免被业务连接
if(checkLis.remove(con)) {
con.close(" heart beate idle ");
}
}
}
}
/**
* removeAll(c) removes only those elements of c and reports true if changed
*/
public void testRemoveAll() {
for (int i = 1; i < SIZE; ++i) {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
ConcurrentLinkedQueue p = populatedQueue(i);
assertTrue(q.removeAll(p));
assertEquals(SIZE - i, q.size());
for (int j = 0; j < i; ++j) {
Integer x = (Integer)(p.remove());
assertFalse(q.contains(x));
}
}
}
/**
* remove removes next element, or throws NSEE if empty
*/
public void testRemove() {
ConcurrentLinkedQueue q = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(i, q.remove());
}
try {
q.remove();
shouldThrow();
} catch (NoSuchElementException success) {}
}
/**
* isEmpty is true before add, false after
*/
public void testEmpty() {
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
assertTrue(q.isEmpty());
q.add(one);
assertFalse(q.isEmpty());
q.add(two);
q.remove();
q.remove();
assertTrue(q.isEmpty());
}