类java.util.concurrent.DelayQueue源码实例Demo

下面列出了怎么用java.util.concurrent.DelayQueue的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kop   文件: SystemTimer.java
private SystemTimer(String executorName,
                    long tickMs,
                    int wheelSize,
                    long startMs) {
    this.taskExecutor = Executors.newFixedThreadPool(
        1, new ThreadFactoryBuilder()
            .setDaemon(false)
            .setNameFormat("system-timer-%d")
            .build()
    );
    this.delayQueue = new DelayQueue();
    this.taskCounter = new AtomicInteger(0);
    this.timingWheel = new TimingWheel(
        tickMs,
        wheelSize,
        startMs,
        taskCounter,
        delayQueue
    );
    this.readWriteLock = new ReentrantReadWriteLock();
    this.readLock = readWriteLock.readLock();
    this.writeLock = readWriteLock.writeLock();
    this.reinsert = timerTaskEntry -> addTimerTaskEntry(timerTaskEntry);
}
 
源代码2 项目: java-interview   文件: CustomDelayQueue.java
public static void main(String[] args) throws InterruptedException {
    DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();
    // 生产者1
    producer(delayQueue, "生产者1");

    // 生产者2
    producer(delayQueue, "生产者2");

    // 消费者
    consumer(delayQueue);

    /* 执行结果
        生产者1,消息编号:1 发送时间:2019-6-12 20:38:37 延迟:2 秒 |执行时间:2019-6-12 20:38:39
        生产者2,消息编号:2 发送时间:2019-6-12 20:38:37 延迟:2 秒 |执行时间:2019-6-12 20:38:39
        生产者1,消息编号:3 发送时间:2019-6-12 20:38:41 延迟:4 秒 |执行时间:2019-6-12 20:38:45
        生产者1,消息编号:5 发送时间:2019-6-12 20:38:43 延迟:2 秒 |执行时间:2019-6-12 20:38:45
        ....
     */
}
 
源代码3 项目: java-interview   文件: CustomDelayQueue.java
private static void producer(DelayQueue<DelayedElement> delayQueue, String name) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                // 产生 1~5 秒的随机数
                long time = 1000L * (new Random().nextInt(5) + 1);
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 组合消息体
                String message = String.format("%s,消息编号:%s 发送时间:%s 延迟:%s 秒",
                        name, MESSAGENO.getAndIncrement(), DateFormat.getDateTimeInstance().format(new Date()), time / 1000);
                // 生产消息
                delayQueue.put(new DelayedElement(message, time));
            }
        }
    }).start();
}
 
源代码4 项目: opensharding-spi-impl   文件: RetryThread.java
public RetryThread(final DelayQueue<BaseOperation> queue) {
    this.queue = queue;
    retryExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
        
        private final AtomicInteger threadIndex = new AtomicInteger(0);
        
        @Override
        public Thread newThread(final Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("zk-retry-" + threadIndex.incrementAndGet());
            return thread;
        }
    });
    addDelayedShutdownHook(retryExecutor, closeDelay, TimeUnit.SECONDS);
}
 
源代码5 项目: pxf   文件: UGICache.java
/**
 * Decrement reference count for the given session's UGI. Resets the time at which the UGI will
 * expire to UGI_CACHE_EXPIRY milliseconds in the future.
 *
 * @param session                  the session for which we want to release the UGI.
 * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it
 *                                 is now unreferenced).
 */
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) {

    Entry entry = cache.get(session);

    if (entry == null) {
        throw new IllegalStateException("Cannot release UGI for this session; it is not cached: " + session);
    }

    DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());

    synchronized (expirationQueue) {
        entry.decrementRefCount();
        expirationQueue.remove(entry);
        if (cleanImmediatelyIfNoRefs && entry.isNotInUse()) {
            closeUGI(entry);
        } else {
            // Reset expiration time and put it back in the queue
            // only when we don't close the UGI
            entry.resetTime();
            expirationQueue.offer(entry);
        }
    }
}
 
源代码6 项目: java   文件: DefaultDelayingQueue.java
private void insert(
    DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
  WaitForEntry existing = knownEntries.get((T) entry.data);
  if (existing != null) {
    if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
      q.remove(existing);
      existing.readyAtMillis = entry.readyAtMillis;
      q.add(existing);
    }

    return;
  }

  q.offer(entry);
  knownEntries.put((T) entry.data, entry);
}
 
源代码7 项目: tutorials   文件: DelayQueueIntegrationTest.java
@Test
public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
    //given
    ExecutorService executor = Executors.newFixedThreadPool(2);
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
    DelayQueueProducer producer
      = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

    //when
    executor.submit(producer);
    executor.submit(consumer);

    //then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);

}
 
源代码8 项目: openjdk-jdk9   文件: Iterate.java
private static void realMain(String[] args) throws Throwable {
    Godot[] godots = new Godot[] { new Godot(), new Godot(), new Godot() };
    DelayQueue<Godot> q = new DelayQueue<>(Arrays.asList(godots));
    Iterator<Godot> it = q.iterator();
    q.clear();
    check(it.hasNext());
    equal(it.next(), godots[0]);
    it.remove();
    check(q.isEmpty());

    q.addAll(Arrays.asList(godots));
    it = q.iterator();
    check(it.hasNext());
    it.next();
    equal(it.next(), godots[1]);
    it.remove();
    equal(q.size(), 2);
    check(q.contains(godots[0]));
    check(q.contains(godots[2]));
}
 
源代码9 项目: spectator   文件: Scheduler.java
/**
 * Execute the task and if reschedule another execution.
 *
 * @param queue
 *     Queue for the pool. This task will be added to the queue to schedule
 *     future executions.
 * @param stats
 *     Handle to stats that should be updated based on the execution of the
 *     task.
 */
@SuppressWarnings("PMD.AvoidCatchingThrowable")
void runAndReschedule(DelayQueue<DelayedTask> queue, Stats stats) {
  thread = Thread.currentThread();
  boolean scheduleAgain = options.schedulingPolicy != Policy.RUN_ONCE;
  try {
    if (!isDone()) {
      task.run();
    }
  } catch (Throwable t) {
    // This catches Throwable because we cannot control the task and thus cannot
    // ensure it is well behaved with respect to exceptions.
    LOGGER.warn("task execution failed", t);
    stats.incrementUncaught(t);
    scheduleAgain = !options.stopOnFailure;
  } finally {
    thread = null;
    if (scheduleAgain && !isDone()) {
      updateNextExecutionTime(stats.skipped());
      queue.put(this);
    } else {
      cancelled = true;
    }
  }
}
 
源代码10 项目: groovy   文件: DefaultGroovyMethodsSupport.java
@SuppressWarnings("unchecked")
protected static <T> Queue<T> createSimilarQueue(Queue<T> orig) {
    if (orig instanceof ArrayBlockingQueue) {
        ArrayBlockingQueue queue = (ArrayBlockingQueue) orig;
        return new ArrayBlockingQueue<T>(queue.size() + queue.remainingCapacity());
    } else if (orig instanceof ArrayDeque) {
        return new ArrayDeque<T>();
    } else if (orig instanceof ConcurrentLinkedQueue) {
        return new ConcurrentLinkedQueue<T>();
    } else if (orig instanceof DelayQueue) {
        return new DelayQueue();
    } else if (orig instanceof LinkedBlockingDeque) {
        return new LinkedBlockingDeque<T>();
    } else if (orig instanceof LinkedBlockingQueue) {
        return new LinkedBlockingQueue<T>();
    } else if (orig instanceof PriorityBlockingQueue) {
        return new PriorityBlockingQueue<T>();
    } else if (orig instanceof PriorityQueue) {
        return new PriorityQueue<T>(11, ((PriorityQueue) orig).comparator());
    } else if (orig instanceof SynchronousQueue) {
        return new SynchronousQueue<T>();
    } else {
        return new LinkedList<T>();
    }
}
 
源代码11 项目: Zebra   文件: Cpt8_TimeoutManager.java
/**
 * 模拟在清理session池的时候,session因重新调用而导致清理时间延迟
 * @param list
 */
public static void updateObject(final List<Session> list,final DelayQueue<Session> queue){
	Thread thread=new Thread(){
		public void run(){
			try {
				//对于iteratorDelayQueue可能存在同步的问题,但是这里因sleep时间点的问题,不会发生异常
				//暂时不需要处理
				Thread.sleep(1000);	//睡眠1000ms
				//list(4)默认生命周期是2000ms,睡眠1000后,现在应该还有1000ms左右
				list.get(4).updateTriger();
				result.add("id:"+list.get(4).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
				Thread.sleep(1000);	//再次睡眠1000ms
				//再次延长list(4),这次延时后list(4)的总生命周期应该是4000ms
				list.get(4).updateTriger();
				result.add("id:"+list.get(4).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
				//执行到此处时,一共睡眠了2000ms,list(1)的初始生命是6000ms,此时延迟应该总共生命周期为8000ms
				list.get(1).updateTriger();
				result.add("id:"+list.get(2).id+" 寿命延长\t currentTime:"+System.currentTimeMillis());
			} catch (InterruptedException e) { }
		}
	};
	thread.start();
}
 
源代码12 项目: Zebra   文件: Cpt8_TimeoutManager.java
public static void testDelayQueue(){
	DelayQueue<Session> queue=new DelayQueue<Session>();
	Random random=new Random(47);
	StringBuilder sb=new StringBuilder();
	List<Session> list=new ArrayList<Session>();
	//生产对象添加到队列中
	for(int i=0;i<5;i++){
		long timeout=(random.nextInt(10)+1)*1000;	//11以内的整数乘以1000毫秒
		Session temp=new Session(timeout);
		sb.append("id:"+temp.id+"-").append(timeout).append(" ");
		list.add(temp);
		queue.offer(temp);
	}
	System.out.println("=========================添加到队列中的顺序=========================");
	System.out.println(sb.toString());
	//可以先观察queue的排序结果
	System.out.println("=========================队列中实际的顺序========================");
	System.out.println(iteratorDelayQueue(queue));
	System.out.println("=========================启动清理线程==============================");
	monitorThread(queue);	//启动监控清理线程
	//可先不执行延迟清理,进行观察
	updateObject(list,queue);	//模拟因session最新被调用,而延迟清理
}
 
源代码13 项目: j2objc   文件: DelayQueueTest.java
/**
 * timed poll transfers elements across Executor tasks
 */
public void testPollInExecutor() {
    final DelayQueue q = new DelayQueue();
    final CheckedBarrier threadsStarted = new CheckedBarrier(2);
    final ExecutorService executor = Executors.newFixedThreadPool(2);
    try (PoolCleaner cleaner = cleaner(executor)) {
        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                assertNull(q.poll());
                threadsStarted.await();
                assertNotNull(q.poll(LONG_DELAY_MS, MILLISECONDS));
                checkEmpty(q);
            }});

        executor.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadsStarted.await();
                q.put(new PDelay(1));
            }});
    }
}
 
源代码14 项目: openjdk-jdk9   文件: DelayQueueTest.java
/**
 * drainTo(c) empties queue into another collection c
 */
public void testDrainTo() {
    DelayQueue q = new DelayQueue();
    PDelay[] elems = new PDelay[SIZE];
    for (int i = 0; i < SIZE; ++i) {
        elems[i] = new PDelay(i);
        q.add(elems[i]);
    }
    ArrayList l = new ArrayList();
    q.drainTo(l);
    assertEquals(0, q.size());
    for (int i = 0; i < SIZE; ++i)
        assertEquals(elems[i], l.get(i));
    q.add(elems[0]);
    q.add(elems[1]);
    assertFalse(q.isEmpty());
    assertTrue(q.contains(elems[0]));
    assertTrue(q.contains(elems[1]));
    l.clear();
    q.drainTo(l);
    assertEquals(0, q.size());
    assertEquals(2, l.size());
    for (int i = 0; i < 2; ++i)
        assertEquals(elems[i], l.get(i));
}
 
@Test
public void testWithSingleBoundParameterizedOnInstantiate() throws Exception {
	Method bridgeMethod = DelayQueue.class.getMethod("add", Object.class);
	assertTrue(bridgeMethod.isBridge());
	Method actualMethod = DelayQueue.class.getMethod("add", Delayed.class);
	assertFalse(actualMethod.isBridge());
	assertEquals(actualMethod, BridgeMethodResolver.findBridgedMethod(bridgeMethod));
}
 
源代码16 项目: j2objc   文件: DelayQueueTest.java
/**
 * removeAll(c) removes only those elements of c and reports true if changed
 */
public void testRemoveAll() {
    for (int i = 1; i < SIZE; ++i) {
        DelayQueue q = populatedQueue(SIZE);
        DelayQueue p = populatedQueue(i);
        assertTrue(q.removeAll(p));
        assertEquals(SIZE - i, q.size());
        for (int j = 0; j < i; ++j) {
            PDelay x = (PDelay)(p.remove());
            assertFalse(q.contains(x));
        }
    }
}
 
源代码17 项目: hbase   文件: DelayedUtil.java
/**
 * @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
 */
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
  try {
    return queue.take();
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return null;
  }
}
 
源代码18 项目: code   文件: DelayQueueTest.java
public static void main(String[] args) throws InterruptedException {
    BlockingQueue<DelayedDTO> q = new DelayQueue<>();
    DelayQueueTest.Product p = new DelayQueueTest.Product(q);
    DelayQueueTest.Consumer c = new DelayQueueTest.Consumer(q);
    new Thread(c).start();
    new Thread(p).start();

}
 
源代码19 项目: j2objc   文件: DelayQueueTest.java
/**
 * Initializing from Collection with some null elements throws NPE
 */
public void testConstructor5() {
    PDelay[] a = new PDelay[SIZE];
    for (int i = 0; i < SIZE - 1; ++i)
        a[i] = new PDelay(i);
    try {
        new DelayQueue(Arrays.asList(a));
        shouldThrow();
    } catch (NullPointerException success) {}
}
 
@Test
public void testWithSingleBoundParameterizedOnInstantiate() throws Exception {
	Method bridgeMethod = DelayQueue.class.getMethod("add", Object.class);
	assertTrue(bridgeMethod.isBridge());
	Method actualMethod = DelayQueue.class.getMethod("add", Delayed.class);
	assertFalse(actualMethod.isBridge());
	assertEquals(actualMethod, BridgeMethodResolver.findBridgedMethod(bridgeMethod));
}
 
源代码21 项目: Zebra   文件: Cpt8_TimeoutManager.java
/**
 * 遍历队列以便观察结果
 * @param queue
 */
private static String iteratorDelayQueue(DelayQueue<Session> queue){
	Iterator<Session> it=queue.iterator();
	StringBuilder sb=new StringBuilder();
	while(it.hasNext()){
		Session dt=it.next();
		sb.append(dt.toString()+"\n");
	}
	return sb.toString();
}
 
源代码22 项目: DataLink   文件: TimingWheel.java
public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
    this.tickMs = tickMs;
    this.wheelSize = wheelSize;
    this.startMs = startMs;
    this.taskCounter = taskCounter;
    this.queue = queue;

    this.interval = tickMs * wheelSize;
    this.buckets = new TimerTaskList[wheelSize];
    for (int i = 0; i < buckets.length; i++) {
        this.buckets[i] = new TimerTaskList(taskCounter);
    }
    this.currentTime = startMs - (startMs % tickMs); // rounding down to multiple of tickMs
}
 
源代码23 项目: xian   文件: CuratorFrameworkImpl.java
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
    ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
    this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
    {
        @Override
        public void process(WatchedEvent watchedEvent)
        {
            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
            processEvent(event);
        }
    }, builder.getRetryPolicy(), builder.canBeReadOnly());

    listeners = new ListenerContainer<CuratorListener>();
    unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
    backgroundOperations = new DelayQueue<OperationAndData<?>>();
    namespace = new NamespaceImpl(this, builder.getNamespace());
    threadFactory = getThreadFactory(builder);
    maxCloseWaitMs = builder.getMaxCloseWaitMs();
    connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
    compressionProvider = builder.getCompressionProvider();
    aclProvider = builder.getAclProvider();
    state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
    useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();

    byte[] builderDefaultData = builder.getDefaultData();
    defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
    authInfos = buildAuths(builder);

    failedDeleteManager = new FailedDeleteManager(this);
    namespaceFacadeCache = new NamespaceFacadeCache(this);
}
 
源代码24 项目: j2objc   文件: DelayQueueTest.java
/**
 * iterator iterates through all elements
 */
public void testIterator() {
    DelayQueue q = populatedQueue(SIZE);
    int i = 0;
    Iterator it = q.iterator();
    while (it.hasNext()) {
        assertTrue(q.contains(it.next()));
        ++i;
    }
    assertEquals(i, SIZE);
    assertIteratorExhausted(it);
}
 
源代码25 项目: pxf   文件: UGICache.java
/**
 * This method is not thread-safe, and is intended to be called in tests.
 *
 * @return the sum of the sizes of the internal queues
 */
int allQueuesSize() {
    int count = 0;
    for (DelayQueue queue : expirationQueueMap.values()) {
        count += queue.size();
    }
    return count;
}
 
源代码26 项目: pxf   文件: UGICache.java
/**
 * This method is O(n) in the number of cache entries and should only be called in tests.
 *
 * @param session
 * @return determine whether the session is in the internal cache
 */
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
boolean contains(SessionId session) {
    DelayQueue<Entry> expirationQueue = getExpirationQueue(session.getSegmentId());
    synchronized (expirationQueue) {
        Entry entry = cache.get(session);
        return entry != null && expirationQueue.contains(entry);
    }
}
 
源代码27 项目: pxf   文件: UGICache.java
/**
 * Get the queue of cache entries associated with a segment, creating it if it doesn't yet
 * exist. This lets us lazily populate the expirationQueueMap.
 *
 * @param segmentId
 * @return the {@link DelayQueue} associated to the segment.
 */
private DelayQueue<Entry> getExpirationQueue(Integer segmentId) {
    DelayQueue<Entry> queue = expirationQueueMap.get(segmentId);
    if (queue == null) {
        synchronized (expirationQueueMap) {
            queue = expirationQueueMap.get(segmentId);
            if (queue == null) {
                queue = new DelayQueue<>();
                expirationQueueMap.put(segmentId, queue);
            }
        }
    }
    return queue;
}
 
源代码28 项目: oxygen   文件: Wheel.java
public Wheel(long duration, int wheelSize, long startTime, DelayQueue<Slot> delayQueue) {
  this.duration = duration;
  this.interval = duration * wheelSize;
  this.slots = Slot.createSlots(wheelSize);
  // 取整
  this.currentTime = startTime - (startTime % duration);
  this.delayQueue = delayQueue;
}
 
源代码29 项目: j2objc   文件: DelayQueueTest.java
/**
 * put doesn't block waiting for take
 */
public void testPutWithTake() throws InterruptedException {
    final DelayQueue q = new DelayQueue();
    Thread t = newStartedThread(new CheckedRunnable() {
        public void realRun() {
            q.put(new PDelay(0));
            q.put(new PDelay(0));
            q.put(new PDelay(0));
            q.put(new PDelay(0));
        }});

    awaitTermination(t);
    assertEquals(4, q.size());
}
 
源代码30 项目: j2objc   文件: DelayQueueTest.java
/**
 * element returns next element, or throws NSEE if empty
 */
public void testElement() {
    DelayQueue q = populatedQueue(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        assertEquals(new PDelay(i), q.element());
        q.poll();
    }
    try {
        q.element();
        shouldThrow();
    } catch (NoSuchElementException success) {}
}
 
 类所在包
 同包方法