下面列出了java.util.concurrent.DelayQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Decrement reference count for the given session's UGI. Resets the time at which the UGI will
* expire to UGI_CACHE_EXPIRY milliseconds in the future.
*
* @param session the session for which we want to release the UGI.
* @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
* is now unreferenced).
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {
Entry entry = cache.get(session);
if (entry == null) {
throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
}
DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
synchronized (expirationQueue) {
entry.decrementRefCount();
expirationQueue.remove(entry);
if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
closeUGI(entry);
} else {
// Reset expiration time and put it back in the queue
// only when we don't close the UGI
entry.resetTime();
expirationQueue.offer(entry);
}
}
}
/**
* Iterate through all the entries in the queue and close expired {@link UserGroupInformation},
* otherwise it resets the timer for every non-expired entry.
*
* @param expirationQueue
*/
private void cleanup(DelayQueue<Entry> expirationQueue) {
Entry expiredUGI;
while ((expiredUGI = expirationQueue.poll()) != null) {
if (expiredUGI.isNotInUse()) {
closeUGI(expiredUGI);
} else {
// The UGI object is still being used by another thread
String fsMsg = "FileSystem for proxy user = " + expiredUGI.getSession().getUser();
LOG.debug("{} Skipping close of {}", expiredUGI.getSession().toString(), fsMsg);
// Place it back in the queue if still in use and was not closed
expiredUGI.resetTime();
expirationQueue.offer(expiredUGI);
}
LOG.debug("Delay Queue Size for segment {} = {}", expiredUGI.getSession().getSegmentId(), expirationQueue.size());
}
}
private void insert(
DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
WaitForEntry existing = knownEntries.get((T) entry.data);
if (existing != null) {
if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
q.remove(existing);
existing.readyAtMillis = entry.readyAtMillis;
q.add(existing);
}
return;
}
q.offer(entry);
knownEntries.put((T) entry.data, entry);
}
public static void testDelayQueue(){
DelayQueue<Session> queue=new DelayQueue<Session>();
Random random=new Random(47);
StringBuilder sb=new StringBuilder();
List<Session> list=new ArrayList<Session>();
//生产对象添加到队列中
for(int i=0;i<5;i++){
long timeout=(random.nextInt(10)+1)*1000; //11以内的整数乘以1000毫秒
Session temp=new Session(timeout);
sb.append("id:"+temp.id+"-").append(timeout).append(" ");
list.add(temp);
queue.offer(temp);
}
System.out.println("=========================添加到队列中的顺序=========================");
System.out.println(sb.toString());
//可以先观察queue的排序结果
System.out.println("=========================队列中实际的顺序========================");
System.out.println(iteratorDelayQueue(queue));
System.out.println("=========================启动清理线程==============================");
monitorThread(queue); //启动监控清理线程
//可先不执行延迟清理,进行观察
updateObject(list,queue); //模拟因session最新被调用,而延迟清理
}
/**
* If a UGI for the given session exists in the cache, returns it. Otherwise, creates a new
* proxy UGI. In either case this method increments the reference count of the UGI. This method
* also destroys expired, unreferenced UGIs for the same segmentId as the given session.
*
* @param session The user from the session is impersonated by the proxy UGI.
* @param isProxyUser true if the {@link UserGroupInformation} is a proxy user
* @return the proxy UGI for the given session.
* @throws IOException when there is an IO issue
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public UserGroupInformation getUserGroupInformation(SessionId session, boolean isProxyUser) throws IOException {
Integer segmentId = session.getSegmentId();
String user = session.getUser();
DelayQueue<Entry> delayQueue = getExpirationQueue(segmentId);
synchronized (delayQueue) {
// Use the opportunity to cleanup any expired entries
cleanup(delayQueue);
Entry entry = cache.get(session);
if (entry == null) {
UserGroupInformation ugi;
if (isProxyUser) {
LOG.debug("{} Creating proxy user = {}", session, user);
ugi = ugiProvider.createProxyUGI(user, session.getLoginUser());
} else {
LOG.debug("{} Creating remote user = {}", session, user);
ugi = ugiProvider.createRemoteUser(user, session);
}
entry = new Entry(ticker, ugi, session);
delayQueue.offer(entry);
cache.put(session, entry);
}
entry.incrementRefCount();
return entry.getUGI();
}
}
/**
* Replace the given activity in the given queue. If not in the queue, adds it to the queue. The activity
* runs after the specified delay (the delay of the previous entry, if any, is ignored)
*
* @param group the queue - all activities within a queue are executed serially
* @param activity the activity
* @param delay the delay
* @param unit the delay unit
*/
public synchronized void replace(QueueGroups group, Activity activity, long delay, TimeUnit unit)
{
ActivityHolder holder = new ActivityHolder(activity, TimeUnit.MILLISECONDS.convert(delay, unit));
DelayQueue<ActivityHolder> queue = queues.get(group);
queue.remove(holder);
queue.offer(holder);
}