java.util.concurrent.BlockingQueue#peek ( )源码实例Demo

下面列出了java.util.concurrent.BlockingQueue#peek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: sourcerer   文件: SubscriptionWorker.java
private List<Update<T>> getUpdateBatch(
        final BlockingQueue<Update<T>> updatesQueue) throws InterruptedException {
    Update<T> update = updatesQueue.poll(1000, TimeUnit.MILLISECONDS);
    if (update != null) {
        // We have at least one pending update, check if there's more!
        List<Update<T>> updatesBatch = new ArrayList<>();
        updatesBatch.add(update);
        if (updatesQueue.peek() != null) {
            logger.debug("Subscription received update, queue not empty, draining ...");
            updatesQueue.drainTo(updatesBatch);
        } else {
            logger.debug("Subscription received single update");
        }
        return updatesBatch;
    } else {
        // Nothing pending, nothing to see here
        logger.trace("No update (yet)");
        return null;
    }
}
 
源代码2 项目: incubator-gobblin   文件: CouchbaseWriterTest.java
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
    TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
  while (queue.remainingCapacity() < threshold) {
    if (sleepTime > 0) {
      Pair<AbstractDocument, Future> topElement = queue.peek();
      if (topElement != null) {
        try {
          topElement.getSecond().get(sleepTime, sleepUnit);
        } catch (Exception te) {
          failedFutures.add(topElement);
        }
        queue.poll();
      }
    }
  }
}
 
源代码3 项目: swift-k   文件: NIOSender.java
private void sendAllEntries(BlockingQueue<NIOSendEntry> q, WritableByteChannel c, SelectionKey key) {
    NIOSendEntry e = null;
    while (true) {
        // get one entry from queue
           synchronized(queues) {
               e = q.peek();
               if (e == null) {
                   queues.remove(c);
                   key.cancel();
                   registered.remove(c);
                   return;
               }
           }
           if (sendAllBuffers(e, c, key)) {
               notifySender(e);
               q.remove();
           }
           else {
               return;
           }
    }
}
 
源代码4 项目: ehcache3   文件: InvocationScopedEventSink.java
private void fireWaiters(Set<StoreEventListener<K, V>> listeners, BlockingQueue<FireableStoreEventHolder<K, V>> orderedQueue) {
  FireableStoreEventHolder<K, V> head;
  while ((head = orderedQueue.peek()) != null && head.isFireable()) {
    if (head.markFired()) {
      // Only proceed if I am the one marking fired
      // Do not notify failed events
      for (StoreEventListener<K, V> listener : listeners) {
        head.fireOn(listener);
      }
      orderedQueue.poll(); // Remove the event I just handled
    } else {
      // Someone else fired it - stopping there
      break;
    }
  }
}
 
源代码5 项目: DDMQ   文件: BrokerController.java
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
    long slowTimeMills = 0;
    final Runnable peek = q.peek();
    if (peek != null) {
        RequestTask rt = BrokerFastFailure.castRunnable(peek);
        slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
    }

    if (slowTimeMills < 0)
        slowTimeMills = 0;

    return slowTimeMills;
}
 
源代码6 项目: DDMQ   文件: BrokerFastFailure.java
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
    while (true) {
        try {
            if (!blockingQueue.isEmpty()) {
                final Runnable runnable = blockingQueue.peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= maxWaitTimeMillsInQueue) {
                    if (blockingQueue.remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}
 
源代码7 项目: rocketmq-4.3.0   文件: BrokerController.java
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
    long slowTimeMills = 0;
    final Runnable peek = q.peek();
    if (peek != null) {
        RequestTask rt = BrokerFastFailure.castRunnable(peek);
        slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
    }

    if (slowTimeMills < 0) {
        slowTimeMills = 0;
    }

    return slowTimeMills;
}
 
源代码8 项目: rocketmq-4.3.0   文件: BrokerFastFailure.java
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
    while (true) {
        try {
            if (!blockingQueue.isEmpty()) {
                final Runnable runnable = blockingQueue.peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= maxWaitTimeMillsInQueue) {
                    if (blockingQueue.remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}
 
源代码9 项目: nifi   文件: FileSystemRepository.java
private boolean deleteBasedOnTimestamp(final BlockingQueue<ArchiveInfo> fileQueue, final long removalTimeThreshold) throws IOException {
    // check next file's last mod time.
    final ArchiveInfo nextFile = fileQueue.peek();
    if (nextFile == null) {
        // Continue on to queue up the files, in case the next file must be destroyed based on time.
        return false;
    }

    // If the last mod time indicates that it should be removed, just continue loop.
    final long oldestArchiveDate = getLastModTime(nextFile.toPath());
    return (oldestArchiveDate <= removalTimeThreshold);
}
 
源代码10 项目: rocketmq-read   文件: BrokerFastFailure.java
/**
 * 在各个队列里清除超时的请求,并返回给客户端系统繁忙
 * @param blockingQueue 队列
 * @param maxWaitTimeMillsInQueue 超时
 */
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
    while (true) {
        try {
            if (!blockingQueue.isEmpty()) {
                final Runnable runnable = blockingQueue.peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= maxWaitTimeMillsInQueue) {
                    if (blockingQueue.remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}
 
源代码11 项目: PeerWasp   文件: FileEventManagerTest.java
/**
 * This test simulates a create event and waits ActionExecutor.ACTION_WAIT_TIME_MS amount of
 * time for the event to be handled. After that, a move is simulated using a delete event on
 * the same file and a create event on a new file with the same content (but different name).
 */
@Test
public void fromDeleteToMoveTest(){
	//handle artificial create event, wait for handling
	manager.onLocalFileCreated(Paths.get(filePaths.get(7)));
	BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
	FileComponent file1 = actionsToCheck.peek();
	sleepMillis(config.getAggregationIntervalInMillis() * 2);

	//check if exactly one element exists in the queue
	assertTrue(actionsToCheck.size() == 0);

	//initiate delete event
	long start = System.currentTimeMillis();

	manager.onLocalFileDeleted(Paths.get(filePaths.get(7)));
	assertTrue(actionsToCheck.size() == 1);

	//initiate re-creation, ensure that all happens in time
	manager.onLocalFileCreated(Paths.get(filePaths.get(8)));
	FileComponent file2 = actionsToCheck.peek();
	assertTrue(actionsToCheck.size() == 1);
	System.out.println(actionsToCheck.peek().getAction().getCurrentState().getClass());
	assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalMoveState);

	long end = System.currentTimeMillis();
	assertTrue(end - start <= config.getAggregationIntervalInMillis());
	sleepMillis(config.getAggregationIntervalInMillis() * 2);

	//cleanup
	deleteFile(Paths.get(filePaths.get(8)));
	sleepMillis(config.getAggregationIntervalInMillis() * 2);
	assertTrue(manager.getFileTree().getFile(files.get(8).toPath()) == null);
	assertTrue(actionsToCheck.size() == 0);
	assertTrue(file1.getAction().getCurrentState() instanceof InitialState);
	assertTrue(file1.getAction().getCurrentState() instanceof InitialState);
}
 
源代码12 项目: DDMQ   文件: BrokerController.java
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
    long slowTimeMills = 0;
    final Runnable peek = q.peek();
    if (peek != null) {
        RequestTask rt = BrokerFastFailure.castRunnable(peek);
        slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
    }

    if (slowTimeMills < 0)
        slowTimeMills = 0;

    return slowTimeMills;
}
 
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
    long slowTimeMills = 0;
    final Runnable peek = q.peek();
    if (peek != null) {
        RequestTask rt = BrokerFastFailure.castRunnable(peek);
        slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
    }

    if (slowTimeMills < 0)
        slowTimeMills = 0;

    return slowTimeMills;
}
 
源代码14 项目: hadoop   文件: FairCallQueue.java
/**
 * Peek, like poll, provides no strict consistency.
 */
@Override
public E peek() {
  BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
  if (q == null) {
    return null;
  } else {
    return q.peek();
  }
}
 
源代码15 项目: PeerWasp   文件: FileEventManagerTest.java
/**
 * This test simulates the the process of creating AND moving/renaming a file
 * before the upload to the network was triggered. Therefore, the old file should
 * be ignored (initial state, where execute does nothing) and the new file should
 * be pushed as a create.
 */

@Test
public void createOnLocalMove(){

	//sleepMillis(ActionExecutor.ACTION_WAIT_TIME_MS*3);
	long start = System.currentTimeMillis();

	BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
	assertTrue(actionsToCheck.size() == 0);

	manager.onLocalFileCreated(Paths.get(filePaths.get(4)));

	sleepMillis(10);

	//move the file LOCALLY

	Paths.get(filePaths.get(4)).toFile().delete();
	manager.onLocalFileDeleted(Paths.get(filePaths.get(4)));
	sleepMillis(10);

	manager.onLocalFileCreated(Paths.get(filePaths.get(5)));
	//sleepMillis(10);

	FileComponent head = actionsToCheck.peek();
	System.out.println("actionsToCheck.size(): " + actionsToCheck.size());
	ArrayList<FileComponent> array = new ArrayList<FileComponent>(actionsToCheck);
	for(FileComponent comp : array){
		System.out.println(comp.getPath() + ": " + comp.getAction().getCurrentState().getClass().toString());
	}
	assertTrue(actionsToCheck.size() == 2);
	assertTrue(array.get(0).getAction().getCurrentState() instanceof InitialState);
	assertTrue(array.get(0).getPath().toString().equals(filePaths.get(4)));
	assertTrue(array.get(1).getAction().getCurrentState() instanceof LocalCreateState);
	assertTrue(array.get(1).getPath().toString().equals(filePaths.get(5)));

	long end = System.currentTimeMillis();
	assertTrue(end - start <= config.getAggregationIntervalInMillis());
	sleepMillis(config.getAggregationIntervalInMillis() * 5);
}
 
源代码16 项目: rocketmq   文件: BrokerFastFailure.java
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
    while (true) {
        try {
            if (!blockingQueue.isEmpty()) {
                final Runnable runnable = blockingQueue.peek();
                if (null == runnable) {
                    break;
                }
                final RequestTask rt = castRunnable(runnable);
                if (rt == null || rt.isStopRun()) {
                    break;
                }

                final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                if (behind >= maxWaitTimeMillsInQueue) {
                    if (blockingQueue.remove(runnable)) {
                        rt.setStopRun(true);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        } catch (Throwable ignored) {
        }
    }
}
 
源代码17 项目: ehcache3   文件: InvocationScopedEventSink.java
private void fireOrdered(Set<StoreEventListener<K, V>> listeners, Deque<FireableStoreEventHolder<K, V>> events) {
  for (FireableStoreEventHolder<K, V> fireableEvent : events) {
    fireableEvent.markFireable();

    BlockingQueue<FireableStoreEventHolder<K, V>> orderedQueue = getOrderedQueue(fireableEvent);
    FireableStoreEventHolder<K, V> head = orderedQueue.peek();
    if (head == fireableEvent) {
      // Need to fire my event, plus any it was blocking
      if (head.markFired()) {
        // Only proceed if I am the one marking fired
        // Do not notify failed events
        for (StoreEventListener<K, V> listener : listeners) {
          head.fireOn(listener);
        }
        orderedQueue.poll(); // Remove the event I just handled
      } else {
        // Someone else fired it - stopping there
        // Lost the fire race - may need to wait for full processing
        fireableEvent.waitTillFired();
      }
      fireWaiters(listeners, orderedQueue);
    } else {
      // Waiting for another thread to fire - once that happens, done for this event
      fireableEvent.waitTillFired();
    }
  }
}
 
源代码18 项目: PeerWasp   文件: FileEventManagerTest.java
/**
 * This test issues several modify events for the same file over a long
 * period to check if the events are aggregated accordingly.
 * @throws IOException
 */
@Test
public void onFileModifiedTest() throws IOException{
	BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;

	long start = System.currentTimeMillis();
	System.out.println("Start onFileModifiedTest");
	manager.onLocalFileCreated(Paths.get(filePaths.get(0)));
	manager.onLocalFileModified(Paths.get(filePaths.get(0)));
	assertTrue(actionsToCheck.size() == 1);
	assertNotNull(actionsToCheck);
	assertNotNull(actionsToCheck.peek());
	assertNotNull(actionsToCheck.peek().getAction().getCurrentState());
	assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalCreateState); //no null pointers should occur anymore here

	long end = System.currentTimeMillis();

	assertTrue(end - start <= config.getAggregationIntervalInMillis());

	//issue continuous modifies over a period longer than the wait time
	sleepMillis(config.getAggregationIntervalInMillis() * 2);

	FileTestUtils.writeRandomData(files.get(0).toPath(), 50);
	manager.onLocalFileModified(Paths.get(filePaths.get(0)));
	sleepMillis(config.getAggregationIntervalInMillis() / 2);

	FileTestUtils.writeRandomData(files.get(0).toPath(), 50);
	manager.onLocalFileModified(Paths.get(filePaths.get(0)));
	sleepMillis(config.getAggregationIntervalInMillis() / 2);

	FileComponent comp = actionsToCheck.peek();
	assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalUpdateState);
	assertTrue(actionsToCheck.size() == 1);

	sleepMillis(config.getAggregationIntervalInMillis() * 2);
	printBlockingQueue(actionsToCheck);
	assertTrue(actionsToCheck.size() == 0);
//	System.out.println(comp.getAction().getCurrentState().getClass());

	//cleanup
	manager.onLocalFileHardDelete(Paths.get(filePaths.get(0)));
	sleepMillis(200);
	manager.onLocalFileDeleted(Paths.get(filePaths.get(0)));
	sleepMillis(config.getAggregationIntervalInMillis() * 5);
	assertTrue(manager.getFileTree().getFile(files.get(0).toPath()) == null);
	assertTrue(comp.getAction().getCurrentState() instanceof InitialState);
	assertTrue(actionsToCheck.size() == 0);

}
 
源代码19 项目: PeerWasp   文件: FileEventManagerTest.java
/**
 * Simulate a file delete and an additional modify event, check if the file
 * remains in the delete state and only one action is stored in the queue.
 */
@Test
public void onFileDeletedTest(){
	BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
	SetMultimap<String, FileComponent> deletedFiles = manager.getFileTree().getDeletedByContentHash();
	System.out.println("Start onFileDeletedTest");
	manager.onLocalFileCreated(Paths.get(filePaths.get(0)));
	FileComponent createdFile = actionsToCheck.peek();
	//HERE
	assertTrue(actionsToCheck.size() == 1);
	assertTrue(createdFile.getAction().getCurrentState() instanceof LocalCreateState);

	sleepMillis(config.getAggregationIntervalInMillis() * 2);

	assertTrue(createdFile.getAction().getCurrentState() instanceof EstablishedState);
	assertTrue(actionsToCheck.size() == 0);

	long start = System.currentTimeMillis();

	manager.onLocalFileHardDelete(Paths.get(filePaths.get(0)));
	sleepMillis(200);
	manager.onLocalFileDeleted(Paths.get(filePaths.get(0)));
	System.out.println(actionsToCheck.size());
	assertTrue(actionsToCheck.size() == 1);
	assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalHardDeleteState);

	manager.onLocalFileModified(Paths.get(filePaths.get(0)));
	assertTrue(actionsToCheck.size() == 1);
	assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalHardDeleteState);

	System.out.println("deletedFiles.size(): " + deletedFiles.size());
	//assertTrue(deletedFiles.size() == 1);
	//Set<FileComponent> equalHashes = deletedFiles.get(createdFile.getContentHash());
	//assertTrue(equalHashes.size() == 1);
	//assertTrue(equalHashes.contains(createdFile));

	//check if the testcase was run in time
	long end = System.currentTimeMillis();
	assertTrue(end - start <= config.getAggregationIntervalInMillis());



	sleepMillis(config.getAggregationIntervalInMillis() * 5);

	assertTrue(actionsToCheck.size() == 0);
	assertTrue(manager.getFileTree().getFile(files.get(0).toPath()) == null);
	System.out.println(createdFile.getAction().getCurrentState().getClass());
	assertTrue(createdFile.getAction().getCurrentState() instanceof InitialState);
	System.out.println(actionsToCheck.size());
	assertTrue(deletedFiles.size() == 0);
}
 
源代码20 项目: SI   文件: InMemoryMessageStore.java
@Override
public QueuedRequest retrieveFirst(String endpoint) {
    LOG.trace("Retrieve first for endpoint {}", endpoint);
    BlockingQueue<QueuedRequest> requests = getMessageQueueForEndpoint(endpoint);
    return requests.peek();
}