下面列出了java.util.concurrent.BlockingQueue#peek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private List<Update<T>> getUpdateBatch(
final BlockingQueue<Update<T>> updatesQueue) throws InterruptedException {
Update<T> update = updatesQueue.poll(1000, TimeUnit.MILLISECONDS);
if (update != null) {
// We have at least one pending update, check if there's more!
List<Update<T>> updatesBatch = new ArrayList<>();
updatesBatch.add(update);
if (updatesQueue.peek() != null) {
logger.debug("Subscription received update, queue not empty, draining ...");
updatesQueue.drainTo(updatesBatch);
} else {
logger.debug("Subscription received single update");
}
return updatesBatch;
} else {
// Nothing pending, nothing to see here
logger.trace("No update (yet)");
return null;
}
}
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue, int threshold, long sleepTime,
TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
while (queue.remainingCapacity() < threshold) {
if (sleepTime > 0) {
Pair<AbstractDocument, Future> topElement = queue.peek();
if (topElement != null) {
try {
topElement.getSecond().get(sleepTime, sleepUnit);
} catch (Exception te) {
failedFutures.add(topElement);
}
queue.poll();
}
}
}
}
private void sendAllEntries(BlockingQueue<NIOSendEntry> q, WritableByteChannel c, SelectionKey key) {
NIOSendEntry e = null;
while (true) {
// get one entry from queue
synchronized(queues) {
e = q.peek();
if (e == null) {
queues.remove(c);
key.cancel();
registered.remove(c);
return;
}
}
if (sendAllBuffers(e, c, key)) {
notifySender(e);
q.remove();
}
else {
return;
}
}
}
private void fireWaiters(Set<StoreEventListener<K, V>> listeners, BlockingQueue<FireableStoreEventHolder<K, V>> orderedQueue) {
FireableStoreEventHolder<K, V> head;
while ((head = orderedQueue.peek()) != null && head.isFireable()) {
if (head.markFired()) {
// Only proceed if I am the one marking fired
// Do not notify failed events
for (StoreEventListener<K, V> listener : listeners) {
head.fireOn(listener);
}
orderedQueue.poll(); // Remove the event I just handled
} else {
// Someone else fired it - stopping there
break;
}
}
}
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0)
slowTimeMills = 0;
return slowTimeMills;
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0) {
slowTimeMills = 0;
}
return slowTimeMills;
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
private boolean deleteBasedOnTimestamp(final BlockingQueue<ArchiveInfo> fileQueue, final long removalTimeThreshold) throws IOException {
// check next file's last mod time.
final ArchiveInfo nextFile = fileQueue.peek();
if (nextFile == null) {
// Continue on to queue up the files, in case the next file must be destroyed based on time.
return false;
}
// If the last mod time indicates that it should be removed, just continue loop.
final long oldestArchiveDate = getLastModTime(nextFile.toPath());
return (oldestArchiveDate <= removalTimeThreshold);
}
/**
* 在各个队列里清除超时的请求,并返回给客户端系统繁忙
* @param blockingQueue 队列
* @param maxWaitTimeMillsInQueue 超时
*/
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
/**
* This test simulates a create event and waits ActionExecutor.ACTION_WAIT_TIME_MS amount of
* time for the event to be handled. After that, a move is simulated using a delete event on
* the same file and a create event on a new file with the same content (but different name).
*/
@Test
public void fromDeleteToMoveTest(){
//handle artificial create event, wait for handling
manager.onLocalFileCreated(Paths.get(filePaths.get(7)));
BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
FileComponent file1 = actionsToCheck.peek();
sleepMillis(config.getAggregationIntervalInMillis() * 2);
//check if exactly one element exists in the queue
assertTrue(actionsToCheck.size() == 0);
//initiate delete event
long start = System.currentTimeMillis();
manager.onLocalFileDeleted(Paths.get(filePaths.get(7)));
assertTrue(actionsToCheck.size() == 1);
//initiate re-creation, ensure that all happens in time
manager.onLocalFileCreated(Paths.get(filePaths.get(8)));
FileComponent file2 = actionsToCheck.peek();
assertTrue(actionsToCheck.size() == 1);
System.out.println(actionsToCheck.peek().getAction().getCurrentState().getClass());
assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalMoveState);
long end = System.currentTimeMillis();
assertTrue(end - start <= config.getAggregationIntervalInMillis());
sleepMillis(config.getAggregationIntervalInMillis() * 2);
//cleanup
deleteFile(Paths.get(filePaths.get(8)));
sleepMillis(config.getAggregationIntervalInMillis() * 2);
assertTrue(manager.getFileTree().getFile(files.get(8).toPath()) == null);
assertTrue(actionsToCheck.size() == 0);
assertTrue(file1.getAction().getCurrentState() instanceof InitialState);
assertTrue(file1.getAction().getCurrentState() instanceof InitialState);
}
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0)
slowTimeMills = 0;
return slowTimeMills;
}
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0)
slowTimeMills = 0;
return slowTimeMills;
}
/**
* Peek, like poll, provides no strict consistency.
*/
@Override
public E peek() {
BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
if (q == null) {
return null;
} else {
return q.peek();
}
}
/**
* This test simulates the the process of creating AND moving/renaming a file
* before the upload to the network was triggered. Therefore, the old file should
* be ignored (initial state, where execute does nothing) and the new file should
* be pushed as a create.
*/
@Test
public void createOnLocalMove(){
//sleepMillis(ActionExecutor.ACTION_WAIT_TIME_MS*3);
long start = System.currentTimeMillis();
BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
assertTrue(actionsToCheck.size() == 0);
manager.onLocalFileCreated(Paths.get(filePaths.get(4)));
sleepMillis(10);
//move the file LOCALLY
Paths.get(filePaths.get(4)).toFile().delete();
manager.onLocalFileDeleted(Paths.get(filePaths.get(4)));
sleepMillis(10);
manager.onLocalFileCreated(Paths.get(filePaths.get(5)));
//sleepMillis(10);
FileComponent head = actionsToCheck.peek();
System.out.println("actionsToCheck.size(): " + actionsToCheck.size());
ArrayList<FileComponent> array = new ArrayList<FileComponent>(actionsToCheck);
for(FileComponent comp : array){
System.out.println(comp.getPath() + ": " + comp.getAction().getCurrentState().getClass().toString());
}
assertTrue(actionsToCheck.size() == 2);
assertTrue(array.get(0).getAction().getCurrentState() instanceof InitialState);
assertTrue(array.get(0).getPath().toString().equals(filePaths.get(4)));
assertTrue(array.get(1).getAction().getCurrentState() instanceof LocalCreateState);
assertTrue(array.get(1).getPath().toString().equals(filePaths.get(5)));
long end = System.currentTimeMillis();
assertTrue(end - start <= config.getAggregationIntervalInMillis());
sleepMillis(config.getAggregationIntervalInMillis() * 5);
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
private void fireOrdered(Set<StoreEventListener<K, V>> listeners, Deque<FireableStoreEventHolder<K, V>> events) {
for (FireableStoreEventHolder<K, V> fireableEvent : events) {
fireableEvent.markFireable();
BlockingQueue<FireableStoreEventHolder<K, V>> orderedQueue = getOrderedQueue(fireableEvent);
FireableStoreEventHolder<K, V> head = orderedQueue.peek();
if (head == fireableEvent) {
// Need to fire my event, plus any it was blocking
if (head.markFired()) {
// Only proceed if I am the one marking fired
// Do not notify failed events
for (StoreEventListener<K, V> listener : listeners) {
head.fireOn(listener);
}
orderedQueue.poll(); // Remove the event I just handled
} else {
// Someone else fired it - stopping there
// Lost the fire race - may need to wait for full processing
fireableEvent.waitTillFired();
}
fireWaiters(listeners, orderedQueue);
} else {
// Waiting for another thread to fire - once that happens, done for this event
fireableEvent.waitTillFired();
}
}
}
/**
* This test issues several modify events for the same file over a long
* period to check if the events are aggregated accordingly.
* @throws IOException
*/
@Test
public void onFileModifiedTest() throws IOException{
BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
long start = System.currentTimeMillis();
System.out.println("Start onFileModifiedTest");
manager.onLocalFileCreated(Paths.get(filePaths.get(0)));
manager.onLocalFileModified(Paths.get(filePaths.get(0)));
assertTrue(actionsToCheck.size() == 1);
assertNotNull(actionsToCheck);
assertNotNull(actionsToCheck.peek());
assertNotNull(actionsToCheck.peek().getAction().getCurrentState());
assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalCreateState); //no null pointers should occur anymore here
long end = System.currentTimeMillis();
assertTrue(end - start <= config.getAggregationIntervalInMillis());
//issue continuous modifies over a period longer than the wait time
sleepMillis(config.getAggregationIntervalInMillis() * 2);
FileTestUtils.writeRandomData(files.get(0).toPath(), 50);
manager.onLocalFileModified(Paths.get(filePaths.get(0)));
sleepMillis(config.getAggregationIntervalInMillis() / 2);
FileTestUtils.writeRandomData(files.get(0).toPath(), 50);
manager.onLocalFileModified(Paths.get(filePaths.get(0)));
sleepMillis(config.getAggregationIntervalInMillis() / 2);
FileComponent comp = actionsToCheck.peek();
assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalUpdateState);
assertTrue(actionsToCheck.size() == 1);
sleepMillis(config.getAggregationIntervalInMillis() * 2);
printBlockingQueue(actionsToCheck);
assertTrue(actionsToCheck.size() == 0);
// System.out.println(comp.getAction().getCurrentState().getClass());
//cleanup
manager.onLocalFileHardDelete(Paths.get(filePaths.get(0)));
sleepMillis(200);
manager.onLocalFileDeleted(Paths.get(filePaths.get(0)));
sleepMillis(config.getAggregationIntervalInMillis() * 5);
assertTrue(manager.getFileTree().getFile(files.get(0).toPath()) == null);
assertTrue(comp.getAction().getCurrentState() instanceof InitialState);
assertTrue(actionsToCheck.size() == 0);
}
/**
* Simulate a file delete and an additional modify event, check if the file
* remains in the delete state and only one action is stored in the queue.
*/
@Test
public void onFileDeletedTest(){
BlockingQueue<FileComponent> actionsToCheck = manager.getFileComponentQueue().getQueue();;
SetMultimap<String, FileComponent> deletedFiles = manager.getFileTree().getDeletedByContentHash();
System.out.println("Start onFileDeletedTest");
manager.onLocalFileCreated(Paths.get(filePaths.get(0)));
FileComponent createdFile = actionsToCheck.peek();
//HERE
assertTrue(actionsToCheck.size() == 1);
assertTrue(createdFile.getAction().getCurrentState() instanceof LocalCreateState);
sleepMillis(config.getAggregationIntervalInMillis() * 2);
assertTrue(createdFile.getAction().getCurrentState() instanceof EstablishedState);
assertTrue(actionsToCheck.size() == 0);
long start = System.currentTimeMillis();
manager.onLocalFileHardDelete(Paths.get(filePaths.get(0)));
sleepMillis(200);
manager.onLocalFileDeleted(Paths.get(filePaths.get(0)));
System.out.println(actionsToCheck.size());
assertTrue(actionsToCheck.size() == 1);
assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalHardDeleteState);
manager.onLocalFileModified(Paths.get(filePaths.get(0)));
assertTrue(actionsToCheck.size() == 1);
assertTrue(actionsToCheck.peek().getAction().getCurrentState() instanceof LocalHardDeleteState);
System.out.println("deletedFiles.size(): " + deletedFiles.size());
//assertTrue(deletedFiles.size() == 1);
//Set<FileComponent> equalHashes = deletedFiles.get(createdFile.getContentHash());
//assertTrue(equalHashes.size() == 1);
//assertTrue(equalHashes.contains(createdFile));
//check if the testcase was run in time
long end = System.currentTimeMillis();
assertTrue(end - start <= config.getAggregationIntervalInMillis());
sleepMillis(config.getAggregationIntervalInMillis() * 5);
assertTrue(actionsToCheck.size() == 0);
assertTrue(manager.getFileTree().getFile(files.get(0).toPath()) == null);
System.out.println(createdFile.getAction().getCurrentState().getClass());
assertTrue(createdFile.getAction().getCurrentState() instanceof InitialState);
System.out.println(actionsToCheck.size());
assertTrue(deletedFiles.size() == 0);
}
@Override
public QueuedRequest retrieveFirst(String endpoint) {
LOG.trace("Retrieve first for endpoint {}", endpoint);
BlockingQueue<QueuedRequest> requests = getMessageQueueForEndpoint(endpoint);
return requests.peek();
}