下面列出了怎么用java.util.concurrent.DelayQueue的API类实例代码及写法,或者点击链接到github查看源代码。
private SystemTimer(String executorName,
long tickMs,
int wheelSize,
long startMs) {
this.taskExecutor = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("system-timer-%d")
.build()
);
this.delayQueue = new DelayQueue();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
this.readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.reinsert = timerTaskEntry -> addTimerTaskEntry(timerTaskEntry);
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();
// 生产者1
producer(delayQueue, "生产者1");
// 生产者2
producer(delayQueue, "生产者2");
// 消费者
consumer(delayQueue);
/* 执行结果
生产者1,消息编号:1 发送时间:2019-6-12 20:38:37 延迟:2 秒 |执行时间:2019-6-12 20:38:39
生产者2,消息编号:2 发送时间:2019-6-12 20:38:37 延迟:2 秒 |执行时间:2019-6-12 20:38:39
生产者1,消息编号:3 发送时间:2019-6-12 20:38:41 延迟:4 秒 |执行时间:2019-6-12 20:38:45
生产者1,消息编号:5 发送时间:2019-6-12 20:38:43 延迟:2 秒 |执行时间:2019-6-12 20:38:45
....
*/
}
private static void producer(DelayQueue<DelayedElement> delayQueue, String name) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
// 产生 1~5 秒的随机数
long time = 1000L * (new Random().nextInt(5) + 1);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 组合消息体
String message = String.format("%s,消息编号:%s 发送时间:%s 延迟:%s 秒",
name, MESSAGENO.getAndIncrement(), DateFormat.getDateTimeInstance().format(new Date()), time / 1000);
// 生产消息
delayQueue.put(new DelayedElement(message, time));
}
}
}).start();
}
public RetryThread(final DelayQueue<BaseOperation> queue) {
this.queue = queue;
retryExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("zk-retry-" + threadIndex.incrementAndGet());
return thread;
}
});
addDelayedShutdownHook(retryExecutor, closeDelay, TimeUnit.SECONDS);
}
/**
* Decrement reference count for the given session's UGI. Resets the time at which the UGI will
* expire to UGI_CACHE_EXPIRY milliseconds in the future.
*
* @param session the session for which we want to release the UGI.
* @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
* is now unreferenced).
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {
Entry entry = cache.get(session);
if (entry == null) {
throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
}
DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
synchronized (expirationQueue) {
entry.decrementRefCount();
expirationQueue.remove(entry);
if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
closeUGI(entry);
} else {
// Reset expiration time and put it back in the queue
// only when we don't close the UGI
entry.resetTime();
expirationQueue.offer(entry);
}
}
}
private void insert(
DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
WaitForEntry existing = knownEntries.get((T) entry.data);
if (existing != null) {
if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
q.remove(existing);
existing.readyAtMillis = entry.readyAtMillis;
q.add(existing);
}
return;
}
q.offer(entry);
knownEntries.put((T) entry.data, entry);
}
@Test
public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
//given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
//when
executor.submit(producer);
executor.submit(consumer);
//then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
private static void realMain(String[] args) throws Throwable {
Godot[] godots = new Godot[] { new Godot(), new Godot(), new Godot() };
DelayQueue<Godot> q = new DelayQueue<>(Arrays.asList(godots));
Iterator<Godot> it = q.iterator();
q.clear();
check(it.hasNext());
equal(it.next(), godots[0]);
it.remove();
check(q.isEmpty());
q.addAll(Arrays.asList(godots));
it = q.iterator();
check(it.hasNext());
it.next();
equal(it.next(), godots[1]);
it.remove();
equal(q.size(), 2);
check(q.contains(godots[0]));
check(q.contains(godots[2]));
}
/**
* Execute the task and if reschedule another execution.
*
* @param queue
* Queue for the pool. This task will be added to the queue to schedule
* future executions.
* @param stats
* Handle to stats that should be updated based on the execution of the
* task.
*/
@SuppressWarnings("PMD.AvoidCatchingThrowable")
void runAndReschedule(DelayQueue<DelayedTask> queue, Stats stats) {
thread = Thread.currentThread();
boolean scheduleAgain = options.schedulingPolicy != Policy.RUN_ONCE;
try {
if (!isDone()) {
task.run();
}
} catch (Throwable t) {
// This catches Throwable because we cannot control the task and thus cannot
// ensure it is well behaved with respect to exceptions.
LOGGER.warn("task execution failed", t);
stats.incrementUncaught(t);
scheduleAgain = !options.stopOnFailure;
} finally {
thread = null;
if (scheduleAgain && !isDone()) {
updateNextExecutionTime(stats.skipped());
queue.put(this);
} else {
cancelled = true;
}
}
}
@SuppressWarnings("unchecked")
protected static <T> Queue<T> createSimilarQueue(Queue<T> orig) {
if (orig instanceof ArrayBlockingQueue) {
ArrayBlockingQueue queue = (ArrayBlockingQueue) orig;
return new ArrayBlockingQueue<T>(queue.size() + queue.remainingCapacity());
} else if (orig instanceof ArrayDeque) {
return new ArrayDeque<T>();
} else if (orig instanceof ConcurrentLinkedQueue) {
return new ConcurrentLinkedQueue<T>();
} else if (orig instanceof DelayQueue) {
return new DelayQueue();
} else if (orig instanceof LinkedBlockingDeque) {
return new LinkedBlockingDeque<T>();
} else if (orig instanceof LinkedBlockingQueue) {
return new LinkedBlockingQueue<T>();
} else if (orig instanceof PriorityBlockingQueue) {
return new PriorityBlockingQueue<T>();
} else if (orig instanceof PriorityQueue) {
return new PriorityQueue<T>(11, ((PriorityQueue) orig).comparator());
} else if (orig instanceof SynchronousQueue) {
return new SynchronousQueue<T>();
} else {
return new LinkedList<T>();
}
}
/**
* 模拟在清理session池的时候,session因重新调用而导致清理时间延迟
* @param list
*/
public static void updateObject(final List<Session> list,final DelayQueue<Session> queue){
Thread thread=new Thread(){
public void run(){
try {
//对于iteratorDelayQueue可能存在同步的问题,但是这里因sleep时间点的问题,不会发生异常
//暂时不需要处理
Thread.sleep(1000); //睡眠1000ms
//list(4)默认生命周期是2000ms,睡眠1000后,现在应该还有1000ms左右
list.get(4).updateTriger();
result.add("id:"+list.get(4).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
Thread.sleep(1000); //再次睡眠1000ms
//再次延长list(4),这次延时后list(4)的总生命周期应该是4000ms
list.get(4).updateTriger();
result.add("id:"+list.get(4).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
//执行到此处时,一共睡眠了2000ms,list(1)的初始生命是6000ms,此时延迟应该总共生命周期为8000ms
list.get(1).updateTriger();
result.add("id:"+list.get(2).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
} catch (InterruptedException e) { }
}
};
thread.start();
}
public static void testDelayQueue(){
DelayQueue<Session> queue=new DelayQueue<Session>();
Random random=new Random(47);
StringBuilder sb=new StringBuilder();
List<Session> list=new ArrayList<Session>();
//生产对象添加到队列中
for(int i=0;i<5;i++){
long timeout=(random.nextInt(10)+1)*1000; //11以内的整数乘以1000毫秒
Session temp=new Session(timeout);
sb.append("id:"+temp.id+"-").append(timeout).append(" ");
list.add(temp);
queue.offer(temp);
}
System.out.println("=========================添加到队列中的顺序=========================");
System.out.println(sb.toString());
//可以先观察queue的排序结果
System.out.println("=========================队列中实际的顺序========================");
System.out.println(iteratorDelayQueue(queue));
System.out.println("=========================启动清理线程==============================");
monitorThread(queue); //启动监控清理线程
//可先不执行延迟清理,进行观察
updateObject(list,queue); //模拟因session最新被调用,而延迟清理
}
/**
* timed poll transfers elements across Executor tasks
*/
public void testPollInExecutor() {
final DelayQueue q = new DelayQueue();
final CheckedBarrier threadsStarted = new CheckedBarrier(2);
final ExecutorService executor = Executors.newFixedThreadPool(2);
try (PoolCleaner cleaner = cleaner(executor)) {
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
assertNull(q.poll());
threadsStarted.await();
assertNotNull(q.poll(LONG_DELAY_MS, MILLISECONDS));
checkEmpty(q);
}});
executor.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.await();
q.put(new PDelay(1));
}});
}
}
/**
* drainTo(c) empties queue into another collection c
*/
public void testDrainTo() {
DelayQueue q = new DelayQueue();
PDelay[] elems = new PDelay[SIZE];
for (int i = 0; i < SIZE; ++i) {
elems[i] = new PDelay(i);
q.add(elems[i]);
}
ArrayList l = new ArrayList();
q.drainTo(l);
assertEquals(0, q.size());
for (int i = 0; i < SIZE; ++i)
assertEquals(elems[i], l.get(i));
q.add(elems[0]);
q.add(elems[1]);
assertFalse(q.isEmpty());
assertTrue(q.contains(elems[0]));
assertTrue(q.contains(elems[1]));
l.clear();
q.drainTo(l);
assertEquals(0, q.size());
assertEquals(2, l.size());
for (int i = 0; i < 2; ++i)
assertEquals(elems[i], l.get(i));
}
@Test
public void testWithSingleBoundParameterizedOnInstantiate() throws Exception {
Method bridgeMethod = DelayQueue.class.getMethod("add", Object.class);
assertTrue(bridgeMethod.isBridge());
Method actualMethod = DelayQueue.class.getMethod("add", Delayed.class);
assertFalse(actualMethod.isBridge());
assertEquals(actualMethod, BridgeMethodResolver.findBridgedMethod(bridgeMethod));
}
/**
* removeAll(c) removes only those elements of c and reports true if changed
*/
public void testRemoveAll() {
for (int i = 1; i < SIZE; ++i) {
DelayQueue q = populatedQueue(SIZE);
DelayQueue p = populatedQueue(i);
assertTrue(q.removeAll(p));
assertEquals(SIZE - i, q.size());
for (int j = 0; j < i; ++j) {
PDelay x = (PDelay)(p.remove());
assertFalse(q.contains(x));
}
}
}
/**
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
*/
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<DelayedDTO> q = new DelayQueue<>();
DelayQueueTest.Product p = new DelayQueueTest.Product(q);
DelayQueueTest.Consumer c = new DelayQueueTest.Consumer(q);
new Thread(c).start();
new Thread(p).start();
}
/**
* Initializing from Collection with some null elements throws NPE
*/
public void testConstructor5() {
PDelay[] a = new PDelay[SIZE];
for (int i = 0; i < SIZE - 1; ++i)
a[i] = new PDelay(i);
try {
new DelayQueue(Arrays.asList(a));
shouldThrow();
} catch (NullPointerException success) {}
}
@Test
public void testWithSingleBoundParameterizedOnInstantiate() throws Exception {
Method bridgeMethod = DelayQueue.class.getMethod("add", Object.class);
assertTrue(bridgeMethod.isBridge());
Method actualMethod = DelayQueue.class.getMethod("add", Delayed.class);
assertFalse(actualMethod.isBridge());
assertEquals(actualMethod, BridgeMethodResolver.findBridgedMethod(bridgeMethod));
}
/**
* 遍历队列以便观察结果
* @param queue
*/
private static String iteratorDelayQueue(DelayQueue<Session> queue){
Iterator<Session> it=queue.iterator();
StringBuilder sb=new StringBuilder();
while(it.hasNext()){
Session dt=it.next();
sb.append(dt.toString()+"\n");
}
return sb.toString();
}
public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.taskCounter = taskCounter;
this.queue = queue;
this.interval = tickMs * wheelSize;
this.buckets = new TimerTaskList[wheelSize];
for (int i = 0; i < buckets.length; i++) {
this.buckets[i] = new TimerTaskList(taskCounter);
}
this.currentTime = startMs - (startMs % tickMs); // rounding down to multiple of tickMs
}
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
processEvent(event);
}
}, builder.getRetryPolicy(), builder.canBeReadOnly());
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
authInfos = buildAuths(builder);
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
/**
* iterator iterates through all elements
*/
public void testIterator() {
DelayQueue q = populatedQueue(SIZE);
int i = 0;
Iterator it = q.iterator();
while (it.hasNext()) {
assertTrue(q.contains(it.next()));
++i;
}
assertEquals(i, SIZE);
assertIteratorExhausted(it);
}
/**
* This method is not thread-safe, and is intended to be called in tests.
*
* @return the sum of the sizes of the internal queues
*/
int allQueuesSize() {
int count = 0;
for (DelayQueue queue : expirationQueueMap.values()) {
count += queue.size();
}
return count;
}
/**
* This method is O(n) in the number of cache entries and should only be called in tests.
*
* @param session
* @return determine whether the session is in the internal cache
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
boolean contains(SessionId session) {
DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
synchronized (expirationQueue) {
Entry entry = cache.get(session);
return entry != null && expirationQueue.contains(entry);
}
}
/**
* Get the queue of cache entries associated with a segment, creating it if it doesn't yet
* exist. This lets us lazily populate the expirationQueueMap.
*
* @param segmentId
* @return the {@link DelayQueue} associated to the segment.
*/
private DelayQueue<Entry> getExpirationQueue(Integer segmentId) {
DelayQueue<Entry> queue = expirationQueueMap.get(segmentId);
if (queue == null) {
synchronized (expirationQueueMap) {
queue = expirationQueueMap.get(segmentId);
if (queue == null) {
queue = new DelayQueue<>();
expirationQueueMap.put(segmentId, queue);
}
}
}
return queue;
}
public Wheel(long duration, int wheelSize, long startTime, DelayQueue<Slot> delayQueue) {
this.duration = duration;
this.interval = duration * wheelSize;
this.slots = Slot.createSlots(wheelSize);
// 取整
this.currentTime = startTime - (startTime % duration);
this.delayQueue = delayQueue;
}
/**
* put doesn't block waiting for take
*/
public void testPutWithTake() throws InterruptedException {
final DelayQueue q = new DelayQueue();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
q.put(new PDelay(0));
q.put(new PDelay(0));
q.put(new PDelay(0));
q.put(new PDelay(0));
}});
awaitTermination(t);
assertEquals(4, q.size());
}
/**
* element returns next element, or throws NSEE if empty
*/
public void testElement() {
DelayQueue q = populatedQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
assertEquals(new PDelay(i), q.element());
q.poll();
}
try {
q.element();
shouldThrow();
} catch (NoSuchElementException success) {}
}