下面列出了怎么用com.google.common.collect.Queues的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSerializeStateHistory() throws Exception {
SourceState firstState = new SourceState(15l, 20l, -1l, BINLOG_FILE_POS);
SourceState secondState = new SourceState(16l, 21l, -1l, BINLOG_FILE_POS);
SourceState thirdState = new SourceState(17l, 22l, -1l, BINLOG_FILE_POS);
Deque<SourceState> stateHistory = Queues.newArrayDeque();
stateHistory.addLast(firstState);
stateHistory.addLast(secondState);
stateHistory.addLast(thirdState);
Collection<SourceState> states =
JsonUtil.OBJECT_MAPPER.readValue(
JsonUtil.OBJECT_MAPPER.writeValueAsString(stateHistory),
new TypeReference<Collection<SourceState>>() {});
stateHistory = Queues.newArrayDeque(states);
assertEquals(3, states.size());
assertEquals(thirdState, stateHistory.removeLast());
assertEquals(secondState, stateHistory.removeLast());
assertEquals(firstState, stateHistory.removeLast());
}
@Override
public void afterPropertiesSet() throws Exception {
auditExecutorService.submit(() -> {
while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) {
List<ConsumerAudit> toAudit = Lists.newArrayList();
try {
Queues.drain(audits, toAudit, BATCH_SIZE, BATCH_TIMEOUT, BATCH_TIMEUNIT);
if (!toAudit.isEmpty()) {
consumerService.createConsumerAudits(toAudit);
}
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
});
}
PubSubClient(String hostname, int port, int maxPendingMessages) throws IOException {
this.hostname = hostname;
this.port = port;
this.maxPendingMessages = maxPendingMessages;
if (maxPendingMessages <= 0) {
this.pending = Queues.newLinkedBlockingDeque();
} else {
this.pending = Queues.newLinkedBlockingDeque(maxPendingMessages);
}
this.selector = Selector.open();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
close();
}
});
}
private ImmutableSet<TargetIdeInfo> targetsForSourceFilesImpl(
ImmutableMultimap<TargetKey, TargetKey> rdepsMap, Collection<File> sourceFiles) {
ImmutableSet.Builder<TargetIdeInfo> result = ImmutableSet.builder();
Set<TargetKey> roots =
sourceFiles.stream()
.flatMap(f -> rootsMap.get(f).stream())
.collect(ImmutableSet.toImmutableSet());
Queue<TargetKey> todo = Queues.newArrayDeque();
todo.addAll(roots);
Set<TargetKey> seen = Sets.newHashSet();
while (!todo.isEmpty()) {
TargetKey targetKey = todo.remove();
if (!seen.add(targetKey)) {
continue;
}
TargetIdeInfo target = targetMap.get(targetKey);
if (filter.test(target)) {
result.add(target);
}
todo.addAll(rdepsMap.get(targetKey));
}
return result.build();
}
@Override
public void breadthFirstSearch(Task root, Predicate<Pair<Task, Task>> predicate) {
Preconditions.checkNotNull(root);
Queue<Task> queue = Queues.newArrayDeque();
if (predicate.apply(Pair.create((Task) null, root))) {
queue.add(root);
}
while (!queue.isEmpty()) {
Task head = queue.poll();
for (Task child : head.getNestedTasks()) {
if (predicate.apply(Pair.create(head, child))) {
queue.add(child);
}
}
}
}
@Override
public void clear() {
Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.ROOT);
toClearQueue.addAll(topLevelChildren.keySet()
.stream()
.map(name -> new DocumentPath(name, DocumentPath.ROOT))
.collect(Collectors.toList()));
while (!toClearQueue.isEmpty()) {
DocumentPath path = toClearQueue.remove();
Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
if (children.size() == 0) {
docTree.remove(path);
} else {
children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
toClearQueue.add(path);
}
}
}
public void onRequestWriteOrEnqueue(
ChannelHandlerContext ctx, Integer streamId, Object request, ChannelPromise promise) {
if (streamId == null || streamId == Message.H1_STREAM_ID_NONE) {
log.debug("writing request {}", request);
ctx.write(request, promise);
} else {
boolean shouldWrite =
currentProxiedH2StreamId().map(id -> id.equals(streamId)).orElse(Boolean.TRUE);
Queue<PendingRequest> queue =
streamQueue.computeIfAbsent(streamId, k -> Queues.newArrayDeque());
if (shouldWrite) {
log.debug("writing h2-h1 proxy request {}", request);
ctx.write(request, promise);
} else {
log.debug("enqueuing h2-h1 proxy request {}", request);
queue.offer(new PendingRequest(request, promise));
}
}
}
@Test
public void testForwardingOfRequests() throws Exception {
Queue<RequestAndCallback> queue = Queues.newArrayDeque();
BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource")
.requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false)).build();
try (ParallelRequester requester = new ParallelRequester(container)) {
Future<Boolean> future = requester.request(10);
await(new QueueSize(queue, 1), 1000);
Assert.assertEquals(queue.size(), 1);
satisfyRequestBuilder().requestAndCallback(queue.poll()).satisfy();
future.get(1, TimeUnit.SECONDS);
Assert.assertTrue(future.isDone());
Assert.assertTrue(future.get());
}
}
@Test
public void testRetriableFail() throws Exception {
Queue<RequestAndCallback> queue = Queues.newArrayDeque();
BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource")
.requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false))
.maxTimeoutMillis(1000).build();
try (ParallelRequester requester = new ParallelRequester(container)) {
Future<Boolean> future = requester.request(10);
for (int i = 0; i < BatchedPermitsRequester.MAX_RETRIES; i++) {
// container will fail 5 times
await(new QueueSize(queue, 1), 1000);
Assert.assertFalse(future.isDone());
failRequestBuilder().requestAndCallback(queue.poll()).fail();
}
// should return a failure
Assert.assertFalse(future.get());
// should not make any more request
Assert.assertEquals(queue.size(), 0);
}
}
@Test(dependsOnMethods = "testSerializeToSequenceFile")
public void testDeserializeFromSequenceFile() throws IOException {
Queue<WorkUnitState> workUnitStates = Queues.newConcurrentLinkedQueue();
Path seqPath1 = new Path(this.outputPath, "seq1");
Path seqPath2 = new Path(this.outputPath, "seq2");
try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) {
parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath1, workUnitStates, true);
parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath2, workUnitStates, true);
}
Assert.assertFalse(this.fs.exists(seqPath1));
Assert.assertFalse(this.fs.exists(seqPath2));
Assert.assertEquals(workUnitStates.size(), 2);
for (WorkUnitState workUnitState : workUnitStates) {
TestWatermark watermark = new Gson().fromJson(workUnitState.getActualHighWatermark(), TestWatermark.class);
Assert.assertTrue(watermark.getLongWatermark() == 10L || watermark.getLongWatermark() == 100L);
}
}
public EventReporter(Builder builder) {
super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit);
this.closer = Closer.create();
this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
5, TimeUnit.MINUTES);
this.metricContext = builder.context;
this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() {
@Nullable
@Override
public Void apply(Notification notification) {
notificationCallback(notification);
return null;
}
});
this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
}
SelectDispatcher(SelectConfig config) {
super(config);
m_config = config;
CassandraSession session = new CassandraSessionImpl(
config.getCassandraKeyspace(),
config.getCassandraHost(),
config.getCassandraPort(),
config.getCassandraCompression(),
config.getCassandraUsername(),
config.getCassandraPassword(),
config.getCassandraSsl());
m_repository = new CassandraSampleRepository(
session,
Config.CASSANDRA_TTL,
m_metricRegistry,
new DefaultSampleProcessorService(1),
new ContextConfigurations());
m_queryQueue = Queues.newArrayBlockingQueue(config.getThreads() * 10);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remoteAddress = ctx.channel().remoteAddress();
this.ctx = ctx;
isActive.set(true);
requestsQueue = Queues.newConcurrentLinkedQueue();
}
private void registerBodyRequest(DtsResourceManager reousrceManager) {
ResourceMessageHandler messageProcessor = new ResourceMessageHandler(reousrceManager);
BlockingQueue<Runnable> clientThreadPoolQueue = Queues.newLinkedBlockingDeque(100);
ExecutorService clientMessageExecutor =
new ThreadPoolExecutor(nettyClientConfig.getClientCallbackExecutorThreads(),
nettyClientConfig.getClientCallbackExecutorThreads(), 1000 * 60, TimeUnit.MILLISECONDS,
clientThreadPoolQueue, new DtsThreadFactory("ResourceBodyRequestThread_"));
this.remotingClient.registerProcessor(RequestCode.BODY_REQUEST, messageProcessor, clientMessageExecutor);
}
@Test
public void testIdempotency() throws Exception
{
TaskType idempotentType = new TaskType("yes", "1", true);
TaskType nonIdempotentType = new TaskType("no", "1", false);
Task idempotentTask = new Task(new TaskId(), idempotentType);
Task nonIdempotentTask = new Task(new TaskId(), nonIdempotentType);
Task root = new Task(new TaskId(), Lists.newArrayList(idempotentTask, nonIdempotentTask));
Set<TaskId> thrownTasks = Sets.newConcurrentHashSet();
Queue<TaskId> tasks = Queues.newConcurrentLinkedQueue();
TaskExecutor taskExecutor = (m, t) -> () -> {
if ( thrownTasks.add(t.getTaskId()) )
{
throw new RuntimeException();
}
tasks.add(t.getTaskId());
return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "");
};
WorkflowManager workflowManager = WorkflowManagerBuilder.builder()
.addingTaskExecutor(taskExecutor, 10, idempotentType)
.addingTaskExecutor(taskExecutor, 10, nonIdempotentType)
.withCurator(curator, "test", "1")
.build();
try
{
workflowManager.start();
workflowManager.submitTask(root);
timing.sleepABit();
Assert.assertEquals(tasks.size(), 1);
Assert.assertEquals(tasks.poll(), idempotentTask.getTaskId());
}
finally
{
CloseableUtils.closeQuietly(workflowManager);
}
}
private void registerBodyRequest() {
DtsMessageProcessor messageProcessor = createMessageProcessor();
BlockingQueue<Runnable> resourceThreadPoolQueue = Queues.newLinkedBlockingDeque(10000);
ExecutorService resourceMessageExecutor = new ServerFixedThreadPoolExecutor(cpus, cpus, 1000 * 60,
TimeUnit.MILLISECONDS, resourceThreadPoolQueue, new DtsThreadFactory("ServerBodyThread_"));
this.remotingServer.registerProcessor(RequestCode.BODY_REQUEST, messageProcessor, resourceMessageExecutor);
}
private void registerHeaderRequest() {
DtsMessageProcessor messageProcessor = createMessageProcessor();
BlockingQueue<Runnable> clientThreadPoolQueue = Queues.newLinkedBlockingDeque(10000);
ExecutorService clientMessageExecutor =
new ServerFixedThreadPoolExecutor(cpus * headerRequestCorePoolSizeCpuTimes,
cpus * headerRequestMaximumPoolSizeCpuTimes, headerRequestKeepaliveTime, TimeUnit.MILLISECONDS,
clientThreadPoolQueue, new DtsThreadFactory("ServerHeadRequestThread_"));
this.remotingServer.registerProcessor(RequestCode.HEADER_REQUEST, messageProcessor, clientMessageExecutor);
}
@Inject
StateMachineProxy(@Nonnull @StateExecutor Executor executor, @Nonnull StateMachine stateMachine) {
this.executor = checkNotNull(executor);
this.stateMachine = checkNotNull(stateMachine);
this.operations = Queues.newLinkedBlockingQueue();
this.running = new AtomicBoolean(false);
}
@Override
public void executeApp() throws Exception {
executor =
new ThreadPoolExecutor(1, 1, 60, MILLISECONDS, Queues.newLinkedBlockingQueue());
// need to pre-create threads, otherwise lambda execution will be captured by the
// initial thread run, and won't really test lambda execution capture
executor.prestartAllCoreThreads();
transactionMarker();
}
/**
* @param tableName
* @param auths
* @param delegator
* @param maxResults
*/
public RangeStreamScanner(String tableName, Set<Authorizations> auths, ResourceQueue delegator, int maxResults, Query settings) {
super(tableName, auths, delegator, maxResults, settings);
delegatedResourceInitializer = BatchResource.class;
currentQueue = Queues.newArrayDeque();
readLock = queueLock.readLock();
writeLock = queueLock.writeLock();
myExecutor = MoreExecutors.sameThreadExecutor();
if (null != stats)
initializeTimers();
}
private Queue<Runnable> getWorldQueue(int worldId) {
synchronized (callbacks) {
Queue<Runnable> result = callbacks.get(worldId);
if (result == null) {
result = Queues.newConcurrentLinkedQueue();
callbacks.put(worldId, result);
}
return result;
}
}
public ChunkRenderDispatcherLitematica()
{
int threadLimitMemory = Math.max(1, (int)((double)Runtime.getRuntime().maxMemory() * 0.15D) / 10485760);
int threadLimitCPU = Math.max(1, MathHelper.clamp(Runtime.getRuntime().availableProcessors(), 1, threadLimitMemory / 5));
this.countRenderBuilders = MathHelper.clamp(threadLimitCPU * 8, 1, threadLimitMemory);
if (threadLimitCPU > 1)
{
Litematica.logger.info("Creating {} render threads", threadLimitCPU);
for (int i = 0; i < threadLimitCPU; ++i)
{
ChunkRenderWorkerLitematica worker = new ChunkRenderWorkerLitematica(this);
Thread thread = THREAD_FACTORY.newThread(worker);
thread.start();
this.listThreadedWorkers.add(worker);
this.listWorkerThreads.add(thread);
}
}
Litematica.logger.info("Using {} total BufferBuilder caches", this.countRenderBuilders + 1);
this.queueFreeRenderBuilders = Queues.newArrayBlockingQueue(this.countRenderBuilders);
for (int i = 0; i < this.countRenderBuilders; ++i)
{
this.queueFreeRenderBuilders.add(new BufferBuilderCache());
}
this.renderWorker = new ChunkRenderWorkerLitematica(this, new BufferBuilderCache());
}
public StateHistory(
@NonNull final String sourceName,
@Min(1) final int capacity,
@NonNull final Repository<Collection<SourceState>> repository,
@NonNull final MysqlSourceMetrics metrics) {
Preconditions.checkState(capacity > 0);
this.sourceName = sourceName;
this.capacity = capacity;
this.repository = repository;
this.metrics = metrics;
this.stateHistory = Queues.newArrayDeque(getPreviousStates());
}
@Override
protected void onEnabled() {
chunkLock.lock();
try {
if (max_chunks.get() <= 0) {
chunks = Queues.newArrayDeque();
} else {
chunks = EvictingQueue.create(max_chunks.get());
}
} finally {
chunkLock.unlock();
}
}
public TessellatorCache(int capacity, Supplier<E> supplier) {
cache = Queues.newArrayBlockingQueue(capacity);
// fill the cache
for (int i = 0; i < capacity; i++) {
cache.add(supplier.get());
}
// copy list of the original tessellators to prevent others from being added
originals = ImmutableList.copyOf(cache);
}
/**
* Returns a list of delegate futures that correspond to the futures received in the order that
* they complete. Delegate futures return the same value or throw the same exception as the
* corresponding input future returns/throws.
*
* <p>Cancelling a delegate future has no effect on any input future, since the delegate future
* does not correspond to a specific input future until the appropriate number of input futures
* have completed. At that point, it is too late to cancel the input future. The input future's
* result, which cannot be stored into the cancelled delegate future, is ignored.
*
* @since 17.0
*/
@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
// A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
// ArrayDeque
final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
// Using SerializingExecutor here will ensure that each CompletionOrderListener executes
// atomically and therefore that each returned future is guaranteed to be in completion order.
// N.B. there are some cases where the use of this executor could have possibly surprising
// effects when input futures finish at approximately the same time _and_ the output futures
// have directExecutor listeners. In this situation, the listeners may end up running on a
// different thread than if they were attached to the corresponding input future. We believe
// this to be a negligible cost since:
// 1. Using the directExecutor implies that your callback is safe to run on any thread.
// 2. This would likely only be noticeable if you were doing something expensive or blocking on
// a directExecutor listener on one of the output futures which is an antipattern anyway.
SerializingExecutor executor = new SerializingExecutor(directExecutor());
for (final ListenableFuture<? extends T> future : futures) {
SettableFuture<T> delegate = SettableFuture.create();
// Must make sure to add the delegate to the queue first in case the future is already done
delegates.add(delegate);
future.addListener(
new Runnable() {
@Override
public void run() {
delegates.remove().setFuture(future);
}
},
executor);
listBuilder.add(delegate);
}
return listBuilder.build();
}
/**
* Returns a list of delegate futures that correspond to the futures received in the order that
* they complete. Delegate futures return the same value or throw the same exception as the
* corresponding input future returns/throws.
*
* <p>Cancelling a delegate future has no effect on any input future, since the delegate future
* does not correspond to a specific input future until the appropriate number of input futures
* have completed. At that point, it is too late to cancel the input future. The input future's
* result, which cannot be stored into the cancelled delegate future, is ignored.
*
* @since 17.0
*/
@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
// A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
// ArrayDeque
final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
// Using SerializingExecutor here will ensure that each CompletionOrderListener executes
// atomically and therefore that each returned future is guaranteed to be in completion order.
// N.B. there are some cases where the use of this executor could have possibly surprising
// effects when input futures finish at approximately the same time _and_ the output futures
// have directExecutor listeners. In this situation, the listeners may end up running on a
// different thread than if they were attached to the corresponding input future. We believe
// this to be a negligible cost since:
// 1. Using the directExecutor implies that your callback is safe to run on any thread.
// 2. This would likely only be noticeable if you were doing something expensive or blocking on
// a directExecutor listener on one of the output futures which is an antipattern anyway.
SerializingExecutor executor = new SerializingExecutor(directExecutor());
for (final ListenableFuture<? extends T> future : futures) {
SettableFuture<T> delegate = SettableFuture.create();
// Must make sure to add the delegate to the queue first in case the future is already done
delegates.add(delegate);
future.addListener(
new Runnable() {
@Override
public void run() {
delegates.remove().setFuture(future);
}
},
executor);
listBuilder.add(delegate);
}
return listBuilder.build();
}
/**
* Returns a list of delegate futures that correspond to the futures received in the order that
* they complete. Delegate futures return the same value or throw the same exception as the
* corresponding input future returns/throws.
*
* <p>Cancelling a delegate future has no effect on any input future, since the delegate future
* does not correspond to a specific input future until the appropriate number of input futures
* have completed. At that point, it is too late to cancel the input future. The input future's
* result, which cannot be stored into the cancelled delegate future, is ignored.
*
* @since 17.0
*/
@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
// A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
// ArrayDeque
final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
// Using SerializingExecutor here will ensure that each CompletionOrderListener executes
// atomically and therefore that each returned future is guaranteed to be in completion order.
// N.B. there are some cases where the use of this executor could have possibly surprising
// effects when input futures finish at approximately the same time _and_ the output futures
// have directExecutor listeners. In this situation, the listeners may end up running on a
// different thread than if they were attached to the corresponding input future. We believe
// this to be a negligible cost since:
// 1. Using the directExecutor implies that your callback is safe to run on any thread.
// 2. This would likely only be noticeable if you were doing something expensive or blocking on
// a directExecutor listener on one of the output futures which is an antipattern anyway.
SerializingExecutor executor = new SerializingExecutor(directExecutor());
for (final ListenableFuture<? extends T> future : futures) {
SettableFuture<T> delegate = SettableFuture.create();
// Must make sure to add the delegate to the queue first in case the future is already done
delegates.add(delegate);
future.addListener(
new Runnable() {
@Override
public void run() {
delegates.remove().setFuture(future);
}
},
executor);
listBuilder.add(delegate);
}
return listBuilder.build();
}
/**
* Returns a list of delegate futures that correspond to the futures received in the order that
* they complete. Delegate futures return the same value or throw the same exception as the
* corresponding input future returns/throws.
*
* <p>Cancelling a delegate future has no effect on any input future, since the delegate future
* does not correspond to a specific input future until the appropriate number of input futures
* have completed. At that point, it is too late to cancel the input future. The input future's
* result, which cannot be stored into the cancelled delegate future, is ignored.
*
* @since 17.0
*/
@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
Iterable<? extends ListenableFuture<? extends T>> futures) {
// A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
// ArrayDeque
final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
// Using SerializingExecutor here will ensure that each CompletionOrderListener executes
// atomically and therefore that each returned future is guaranteed to be in completion order.
// N.B. there are some cases where the use of this executor could have possibly surprising
// effects when input futures finish at approximately the same time _and_ the output futures
// have directExecutor listeners. In this situation, the listeners may end up running on a
// different thread than if they were attached to the corresponding input future. We believe
// this to be a negligible cost since:
// 1. Using the directExecutor implies that your callback is safe to run on any thread.
// 2. This would likely only be noticeable if you were doing something expensive or blocking on
// a directExecutor listener on one of the output futures which is an antipattern anyway.
SerializingExecutor executor = new SerializingExecutor(directExecutor());
for (final ListenableFuture<? extends T> future : futures) {
SettableFuture<T> delegate = SettableFuture.create();
// Must make sure to add the delegate to the queue first in case the future is already done
delegates.add(delegate);
future.addListener(
new Runnable() {
@Override
public void run() {
delegates.remove().setFuture(future);
}
},
executor);
listBuilder.add(delegate);
}
return listBuilder.build();
}
private Set<String> index() {
Set<String> visited = Sets.newHashSet();
Queue<Version> toVisit = Queues.newLinkedBlockingDeque();
toVisit.add(rootVersion);
while (!toVisit.isEmpty()) {
Version current = toVisit.poll();
if (visited.add(current.getId()) && current.getChild() != null) {
toVisit.add(current.getChild());
}
}
return visited;
}