类com.google.common.collect.Queues源码实例Demo

下面列出了怎么用com.google.common.collect.Queues的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: SpinalTap   文件: JsonSerializationTest.java
@Test
public void testSerializeStateHistory() throws Exception {
  SourceState firstState = new SourceState(15l, 20l, -1l, BINLOG_FILE_POS);
  SourceState secondState = new SourceState(16l, 21l, -1l, BINLOG_FILE_POS);
  SourceState thirdState = new SourceState(17l, 22l, -1l, BINLOG_FILE_POS);

  Deque<SourceState> stateHistory = Queues.newArrayDeque();
  stateHistory.addLast(firstState);
  stateHistory.addLast(secondState);
  stateHistory.addLast(thirdState);

  Collection<SourceState> states =
      JsonUtil.OBJECT_MAPPER.readValue(
          JsonUtil.OBJECT_MAPPER.writeValueAsString(stateHistory),
          new TypeReference<Collection<SourceState>>() {});

  stateHistory = Queues.newArrayDeque(states);

  assertEquals(3, states.size());
  assertEquals(thirdState, stateHistory.removeLast());
  assertEquals(secondState, stateHistory.removeLast());
  assertEquals(firstState, stateHistory.removeLast());
}
 
源代码2 项目: apollo   文件: ConsumerAuditUtil.java
@Override
public void afterPropertiesSet() throws Exception {
  auditExecutorService.submit(() -> {
    while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) {
      List<ConsumerAudit> toAudit = Lists.newArrayList();
      try {
        Queues.drain(audits, toAudit, BATCH_SIZE, BATCH_TIMEOUT, BATCH_TIMEUNIT);
        if (!toAudit.isEmpty()) {
          consumerService.createConsumerAudits(toAudit);
        }
      } catch (Throwable ex) {
        Tracer.logError(ex);
      }
    }
  });
}
 
源代码3 项目: tracing-framework   文件: PubSubClient.java
PubSubClient(String hostname, int port, int maxPendingMessages) throws IOException {
    this.hostname = hostname;
    this.port = port;
    this.maxPendingMessages = maxPendingMessages;
    if (maxPendingMessages <= 0) {
        this.pending = Queues.newLinkedBlockingDeque();
    } else {
        this.pending = Queues.newLinkedBlockingDeque(maxPendingMessages);
    }
    this.selector = Selector.open();
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            close();
        }
    });
}
 
源代码4 项目: intellij   文件: FilteredTargetMap.java
private ImmutableSet<TargetIdeInfo> targetsForSourceFilesImpl(
    ImmutableMultimap<TargetKey, TargetKey> rdepsMap, Collection<File> sourceFiles) {
  ImmutableSet.Builder<TargetIdeInfo> result = ImmutableSet.builder();
  Set<TargetKey> roots =
      sourceFiles.stream()
          .flatMap(f -> rootsMap.get(f).stream())
          .collect(ImmutableSet.toImmutableSet());

  Queue<TargetKey> todo = Queues.newArrayDeque();
  todo.addAll(roots);
  Set<TargetKey> seen = Sets.newHashSet();
  while (!todo.isEmpty()) {
    TargetKey targetKey = todo.remove();
    if (!seen.add(targetKey)) {
      continue;
    }

    TargetIdeInfo target = targetMap.get(targetKey);
    if (filter.test(target)) {
      result.add(target);
    }
    todo.addAll(rdepsMap.get(targetKey));
  }
  return result.build();
}
 
源代码5 项目: ganttproject   文件: TaskManagerImpl.java
@Override
public void breadthFirstSearch(Task root, Predicate<Pair<Task, Task>> predicate) {
  Preconditions.checkNotNull(root);
  Queue<Task> queue = Queues.newArrayDeque();
  if (predicate.apply(Pair.create((Task) null, root))) {
    queue.add(root);
  }
  while (!queue.isEmpty()) {
    Task head = queue.poll();
    for (Task child : head.getNestedTasks()) {
      if (predicate.apply(Pair.create(head, child))) {
        queue.add(child);
      }
    }
  }
}
 
源代码6 项目: atomix   文件: DefaultDocumentTreeService.java
@Override
public void clear() {
  Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
  Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.ROOT);
  toClearQueue.addAll(topLevelChildren.keySet()
      .stream()
      .map(name -> new DocumentPath(name, DocumentPath.ROOT))
      .collect(Collectors.toList()));
  while (!toClearQueue.isEmpty()) {
    DocumentPath path = toClearQueue.remove();
    Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
    if (children.size() == 0) {
      docTree.remove(path);
    } else {
      children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
      toClearQueue.add(path);
    }
  }
}
 
源代码7 项目: xio   文件: Http2To1ProxyRequestQueue.java
public void onRequestWriteOrEnqueue(
    ChannelHandlerContext ctx, Integer streamId, Object request, ChannelPromise promise) {
  if (streamId == null || streamId == Message.H1_STREAM_ID_NONE) {
    log.debug("writing request {}", request);
    ctx.write(request, promise);
  } else {
    boolean shouldWrite =
        currentProxiedH2StreamId().map(id -> id.equals(streamId)).orElse(Boolean.TRUE);

    Queue<PendingRequest> queue =
        streamQueue.computeIfAbsent(streamId, k -> Queues.newArrayDeque());

    if (shouldWrite) {
      log.debug("writing h2-h1 proxy request {}", request);
      ctx.write(request, promise);
    } else {
      log.debug("enqueuing h2-h1 proxy request {}", request);
      queue.offer(new PendingRequest(request, promise));
    }
  }
}
 
@Test
public void testForwardingOfRequests() throws Exception {

  Queue<RequestAndCallback> queue = Queues.newArrayDeque();

  BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource")
      .requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false)).build();
  try (ParallelRequester requester = new ParallelRequester(container)) {

    Future<Boolean> future = requester.request(10);

    await(new QueueSize(queue, 1), 1000);
    Assert.assertEquals(queue.size(), 1);
    satisfyRequestBuilder().requestAndCallback(queue.poll()).satisfy();

    future.get(1, TimeUnit.SECONDS);
    Assert.assertTrue(future.isDone());
    Assert.assertTrue(future.get());
  }
}
 
@Test
public void testRetriableFail() throws Exception {
  Queue<RequestAndCallback> queue = Queues.newArrayDeque();

  BatchedPermitsRequester container = BatchedPermitsRequester.builder().resourceId("resource")
      .requestorIdentifier("requestor").requestSender(new TestRequestSender(queue, false))
      .maxTimeoutMillis(1000).build();
  try (ParallelRequester requester = new ParallelRequester(container)) {

    Future<Boolean> future = requester.request(10);

    for (int i = 0; i < BatchedPermitsRequester.MAX_RETRIES; i++) {
      // container will fail 5 times
      await(new QueueSize(queue, 1), 1000);
      Assert.assertFalse(future.isDone());
      failRequestBuilder().requestAndCallback(queue.poll()).fail();
    }

    // should return a failure
    Assert.assertFalse(future.get());
    // should not make any more request
    Assert.assertEquals(queue.size(), 0);
  }
}
 
源代码10 项目: incubator-gobblin   文件: ParallelRunnerTest.java
@Test(dependsOnMethods = "testSerializeToSequenceFile")
public void testDeserializeFromSequenceFile() throws IOException {
  Queue<WorkUnitState> workUnitStates = Queues.newConcurrentLinkedQueue();

  Path seqPath1 = new Path(this.outputPath, "seq1");
  Path seqPath2 = new Path(this.outputPath, "seq2");

  try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) {
    parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath1, workUnitStates, true);
    parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath2, workUnitStates, true);
  }

  Assert.assertFalse(this.fs.exists(seqPath1));
  Assert.assertFalse(this.fs.exists(seqPath2));

  Assert.assertEquals(workUnitStates.size(), 2);

  for (WorkUnitState workUnitState : workUnitStates) {
    TestWatermark watermark = new Gson().fromJson(workUnitState.getActualHighWatermark(), TestWatermark.class);
    Assert.assertTrue(watermark.getLongWatermark() == 10L || watermark.getLongWatermark() == 100L);
  }
}
 
源代码11 项目: incubator-gobblin   文件: EventReporter.java
public EventReporter(Builder builder) {
  super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit);

  this.closer = Closer.create();
  this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
      (ThreadPoolExecutor) Executors.newFixedThreadPool(1,
          ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
      5, TimeUnit.MINUTES);

  this.metricContext = builder.context;
  this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() {
    @Nullable
    @Override
    public Void apply(Notification notification) {
      notificationCallback(notification);
      return null;
    }
  });
  this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
}
 
源代码12 项目: newts   文件: SelectDispatcher.java
SelectDispatcher(SelectConfig config) {
    super(config);

    m_config = config;

    CassandraSession session = new CassandraSessionImpl(
            config.getCassandraKeyspace(),
            config.getCassandraHost(),
            config.getCassandraPort(),
            config.getCassandraCompression(),
            config.getCassandraUsername(),
            config.getCassandraPassword(),
            config.getCassandraSsl());
    m_repository = new CassandraSampleRepository(
            session,
            Config.CASSANDRA_TTL,
            m_metricRegistry,
            new DefaultSampleProcessorService(1),
            new ContextConfigurations());

    m_queryQueue = Queues.newArrayBlockingQueue(config.getThreads() * 10);

}
 
源代码13 项目: kop   文件: KafkaCommandDecoder.java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    this.remoteAddress = ctx.channel().remoteAddress();
    this.ctx = ctx;
    isActive.set(true);
    requestsQueue = Queues.newConcurrentLinkedQueue();
}
 
源代码14 项目: dts   文件: DefaultDtsResourcMessageSender.java
private void registerBodyRequest(DtsResourceManager reousrceManager) {
    ResourceMessageHandler messageProcessor = new ResourceMessageHandler(reousrceManager);
    BlockingQueue<Runnable> clientThreadPoolQueue = Queues.newLinkedBlockingDeque(100);
    ExecutorService clientMessageExecutor =
        new ThreadPoolExecutor(nettyClientConfig.getClientCallbackExecutorThreads(),
            nettyClientConfig.getClientCallbackExecutorThreads(), 1000 * 60, TimeUnit.MILLISECONDS,
            clientThreadPoolQueue, new DtsThreadFactory("ResourceBodyRequestThread_"));
    this.remotingClient.registerProcessor(RequestCode.BODY_REQUEST, messageProcessor, clientMessageExecutor);
}
 
源代码15 项目: workflow   文件: TestEdges.java
@Test
public void testIdempotency() throws Exception
{
    TaskType idempotentType = new TaskType("yes", "1", true);
    TaskType nonIdempotentType = new TaskType("no", "1", false);

    Task idempotentTask = new Task(new TaskId(), idempotentType);
    Task nonIdempotentTask = new Task(new TaskId(), nonIdempotentType);
    Task root = new Task(new TaskId(), Lists.newArrayList(idempotentTask, nonIdempotentTask));

    Set<TaskId> thrownTasks = Sets.newConcurrentHashSet();
    Queue<TaskId> tasks = Queues.newConcurrentLinkedQueue();
    TaskExecutor taskExecutor = (m, t) -> () -> {
        if ( thrownTasks.add(t.getTaskId()) )
        {
            throw new RuntimeException();
        }
        tasks.add(t.getTaskId());
        return new TaskExecutionResult(TaskExecutionStatus.SUCCESS, "");
    };
    WorkflowManager workflowManager = WorkflowManagerBuilder.builder()
        .addingTaskExecutor(taskExecutor, 10, idempotentType)
        .addingTaskExecutor(taskExecutor, 10, nonIdempotentType)
        .withCurator(curator, "test", "1")
        .build();
    try
    {
        workflowManager.start();
        workflowManager.submitTask(root);

        timing.sleepABit();

        Assert.assertEquals(tasks.size(), 1);
        Assert.assertEquals(tasks.poll(), idempotentTask.getTaskId());
    }
    finally
    {
        CloseableUtils.closeQuietly(workflowManager);
    }
}
 
源代码16 项目: dts   文件: DtsRemotingServer.java
private void registerBodyRequest() {
    DtsMessageProcessor messageProcessor = createMessageProcessor();
    BlockingQueue<Runnable> resourceThreadPoolQueue = Queues.newLinkedBlockingDeque(10000);
    ExecutorService resourceMessageExecutor = new ServerFixedThreadPoolExecutor(cpus, cpus, 1000 * 60,
        TimeUnit.MILLISECONDS, resourceThreadPoolQueue, new DtsThreadFactory("ServerBodyThread_"));
    this.remotingServer.registerProcessor(RequestCode.BODY_REQUEST, messageProcessor, resourceMessageExecutor);
}
 
源代码17 项目: dts   文件: DtsRemotingServer.java
private void registerHeaderRequest() {
    DtsMessageProcessor messageProcessor = createMessageProcessor();
    BlockingQueue<Runnable> clientThreadPoolQueue = Queues.newLinkedBlockingDeque(10000);
    ExecutorService clientMessageExecutor =
        new ServerFixedThreadPoolExecutor(cpus * headerRequestCorePoolSizeCpuTimes,
            cpus * headerRequestMaximumPoolSizeCpuTimes, headerRequestKeepaliveTime, TimeUnit.MILLISECONDS,
            clientThreadPoolQueue, new DtsThreadFactory("ServerHeadRequestThread_"));
    this.remotingServer.registerProcessor(RequestCode.HEADER_REQUEST, messageProcessor, clientMessageExecutor);
}
 
源代码18 项目: barge   文件: StateMachineProxy.java
@Inject
StateMachineProxy(@Nonnull @StateExecutor Executor executor, @Nonnull StateMachine stateMachine) {
  this.executor = checkNotNull(executor);
  this.stateMachine = checkNotNull(stateMachine);
  this.operations = Queues.newLinkedBlockingQueue();
  this.running = new AtomicBoolean(false);
}
 
源代码19 项目: glowroot   文件: ExecutorWithLambdasIT.java
@Override
public void executeApp() throws Exception {
    executor =
            new ThreadPoolExecutor(1, 1, 60, MILLISECONDS, Queues.newLinkedBlockingQueue());
    // need to pre-create threads, otherwise lambda execution will be captured by the
    // initial thread run, and won't really test lambda execution capture
    executor.prestartAllCoreThreads();
    transactionMarker();
}
 
源代码20 项目: datawave   文件: RangeStreamScanner.java
/**
 * @param tableName
 * @param auths
 * @param delegator
 * @param maxResults
 */
public RangeStreamScanner(String tableName, Set<Authorizations> auths, ResourceQueue delegator, int maxResults, Query settings) {
    super(tableName, auths, delegator, maxResults, settings);
    delegatedResourceInitializer = BatchResource.class;
    currentQueue = Queues.newArrayDeque();
    readLock = queueLock.readLock();
    writeLock = queueLock.writeLock();
    myExecutor = MoreExecutors.sameThreadExecutor();
    if (null != stats)
        initializeTimers();
}
 
源代码21 项目: OpenModsLib   文件: DelayedActionTickHandler.java
private Queue<Runnable> getWorldQueue(int worldId) {
	synchronized (callbacks) {
		Queue<Runnable> result = callbacks.get(worldId);

		if (result == null) {
			result = Queues.newConcurrentLinkedQueue();
			callbacks.put(worldId, result);
		}

		return result;
	}
}
 
public ChunkRenderDispatcherLitematica()
{
    int threadLimitMemory = Math.max(1, (int)((double)Runtime.getRuntime().maxMemory() * 0.15D) / 10485760);
    int threadLimitCPU = Math.max(1, MathHelper.clamp(Runtime.getRuntime().availableProcessors(), 1, threadLimitMemory / 5));
    this.countRenderBuilders = MathHelper.clamp(threadLimitCPU * 8, 1, threadLimitMemory);

    if (threadLimitCPU > 1)
    {
        Litematica.logger.info("Creating {} render threads", threadLimitCPU);

        for (int i = 0; i < threadLimitCPU; ++i)
        {
            ChunkRenderWorkerLitematica worker = new ChunkRenderWorkerLitematica(this);
            Thread thread = THREAD_FACTORY.newThread(worker);
            thread.start();
            this.listThreadedWorkers.add(worker);
            this.listWorkerThreads.add(thread);
        }
    }

    Litematica.logger.info("Using {} total BufferBuilder caches", this.countRenderBuilders + 1);

    this.queueFreeRenderBuilders = Queues.newArrayBlockingQueue(this.countRenderBuilders);

    for (int i = 0; i < this.countRenderBuilders; ++i)
    {
        this.queueFreeRenderBuilders.add(new BufferBuilderCache());
    }

    this.renderWorker = new ChunkRenderWorkerLitematica(this, new BufferBuilderCache());
}
 
源代码23 项目: SpinalTap   文件: StateHistory.java
public StateHistory(
    @NonNull final String sourceName,
    @Min(1) final int capacity,
    @NonNull final Repository<Collection<SourceState>> repository,
    @NonNull final MysqlSourceMetrics metrics) {
  Preconditions.checkState(capacity > 0);

  this.sourceName = sourceName;
  this.capacity = capacity;
  this.repository = repository;
  this.metrics = metrics;
  this.stateHistory = Queues.newArrayDeque(getPreviousStates());
}
 
源代码24 项目: ForgeHax   文件: ChunkLogger.java
@Override
protected void onEnabled() {
  chunkLock.lock();
  try {
    if (max_chunks.get() <= 0) {
      chunks = Queues.newArrayDeque();
    } else {
      chunks = EvictingQueue.create(max_chunks.get());
    }
  } finally {
    chunkLock.unlock();
  }
}
 
源代码25 项目: ForgeHax   文件: TessellatorCache.java
public TessellatorCache(int capacity, Supplier<E> supplier) {
  cache = Queues.newArrayBlockingQueue(capacity);
  
  // fill the cache
  for (int i = 0; i < capacity; i++) {
    cache.add(supplier.get());
  }
  
  // copy list of the original tessellators to prevent others from being added
  originals = ImmutableList.copyOf(cache);
}
 
源代码26 项目: codebuff   文件: Futures.java
/**
 * Returns a list of delegate futures that correspond to the futures received in the order that
 * they complete. Delegate futures return the same value or throw the same exception as the
 * corresponding input future returns/throws.
 *
 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
 * does not correspond to a specific input future until the appropriate number of input futures
 * have completed. At that point, it is too late to cancel the input future. The input future's
 * result, which cannot be stored into the cancelled delegate future, is ignored.
 *
 * @since 17.0
 */

@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
  // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
  // ArrayDeque
  final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
  ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
  // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
  // atomically and therefore that each returned future is guaranteed to be in completion order.
  // N.B. there are some cases where the use of this executor could have possibly surprising
  // effects when input futures finish at approximately the same time _and_ the output futures
  // have directExecutor listeners. In this situation, the listeners may end up running on a
  // different thread than if they were attached to the corresponding input future. We believe
  // this to be a negligible cost since:
  // 1. Using the directExecutor implies that your callback is safe to run on any thread.
  // 2. This would likely only be noticeable if you were doing something expensive or blocking on
  //    a directExecutor listener on one of the output futures which is an antipattern anyway.
  SerializingExecutor executor = new SerializingExecutor(directExecutor());
  for (final ListenableFuture<? extends T> future : futures) {
    SettableFuture<T> delegate = SettableFuture.create();
    // Must make sure to add the delegate to the queue first in case the future is already done
    delegates.add(delegate);
    future.addListener(
      new Runnable() {
        @Override
        public void run() {
          delegates.remove().setFuture(future);
        }
      },
      executor);
    listBuilder.add(delegate);
  }
  return listBuilder.build();
}
 
源代码27 项目: codebuff   文件: Futures.java
/**
 * Returns a list of delegate futures that correspond to the futures received in the order that
 * they complete. Delegate futures return the same value or throw the same exception as the
 * corresponding input future returns/throws.
 *
 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
 * does not correspond to a specific input future until the appropriate number of input futures
 * have completed. At that point, it is too late to cancel the input future. The input future's
 * result, which cannot be stored into the cancelled delegate future, is ignored.
 *
 * @since 17.0
 */

@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
  // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
  // ArrayDeque
  final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
  ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
  // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
  // atomically and therefore that each returned future is guaranteed to be in completion order.
  // N.B. there are some cases where the use of this executor could have possibly surprising
  // effects when input futures finish at approximately the same time _and_ the output futures
  // have directExecutor listeners. In this situation, the listeners may end up running on a
  // different thread than if they were attached to the corresponding input future. We believe
  // this to be a negligible cost since:
  // 1. Using the directExecutor implies that your callback is safe to run on any thread.
  // 2. This would likely only be noticeable if you were doing something expensive or blocking on
  //    a directExecutor listener on one of the output futures which is an antipattern anyway.
  SerializingExecutor executor = new SerializingExecutor(directExecutor());
  for (final ListenableFuture<? extends T> future : futures) {
    SettableFuture<T> delegate = SettableFuture.create();
    // Must make sure to add the delegate to the queue first in case the future is already done
    delegates.add(delegate);
    future.addListener(
      new Runnable() {
        @Override
        public void run() {
          delegates.remove().setFuture(future);
        }
      },
      executor);
    listBuilder.add(delegate);
  }
  return listBuilder.build();
}
 
源代码28 项目: codebuff   文件: Futures.java
/**
 * Returns a list of delegate futures that correspond to the futures received in the order that
 * they complete. Delegate futures return the same value or throw the same exception as the
 * corresponding input future returns/throws.
 *
 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
 * does not correspond to a specific input future until the appropriate number of input futures
 * have completed. At that point, it is too late to cancel the input future. The input future's
 * result, which cannot be stored into the cancelled delegate future, is ignored.
 *
 * @since 17.0
 */

@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<? extends ListenableFuture<? extends T>> futures) {
  // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
  // ArrayDeque
  final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
  ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
  // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
  // atomically and therefore that each returned future is guaranteed to be in completion order.
  // N.B. there are some cases where the use of this executor could have possibly surprising
  // effects when input futures finish at approximately the same time _and_ the output futures
  // have directExecutor listeners. In this situation, the listeners may end up running on a
  // different thread than if they were attached to the corresponding input future. We believe
  // this to be a negligible cost since:
  // 1. Using the directExecutor implies that your callback is safe to run on any thread.
  // 2. This would likely only be noticeable if you were doing something expensive or blocking on
  //    a directExecutor listener on one of the output futures which is an antipattern anyway.
  SerializingExecutor executor = new SerializingExecutor(directExecutor());
  for (final ListenableFuture<? extends T> future : futures) {
    SettableFuture<T> delegate = SettableFuture.create();
    // Must make sure to add the delegate to the queue first in case the future is already done
    delegates.add(delegate);
    future.addListener(
      new Runnable() {
        @Override
        public void run() {
          delegates.remove().setFuture(future);
        }
      },
      executor);
    listBuilder.add(delegate);
  }
  return listBuilder.build();
}
 
源代码29 项目: codebuff   文件: Futures.java
/**
 * Returns a list of delegate futures that correspond to the futures received in the order that
 * they complete. Delegate futures return the same value or throw the same exception as the
 * corresponding input future returns/throws.
 *
 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
 * does not correspond to a specific input future until the appropriate number of input futures
 * have completed. At that point, it is too late to cancel the input future. The input future's
 * result, which cannot be stored into the cancelled delegate future, is ignored.
 *
 * @since 17.0
 */
@Beta
@GwtIncompatible // TODO
public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
    Iterable<? extends ListenableFuture<? extends T>> futures) {
  // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
  // ArrayDeque
  final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
  ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
  // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
  // atomically and therefore that each returned future is guaranteed to be in completion order.
  // N.B. there are some cases where the use of this executor could have possibly surprising
  // effects when input futures finish at approximately the same time _and_ the output futures
  // have directExecutor listeners. In this situation, the listeners may end up running on a
  // different thread than if they were attached to the corresponding input future. We believe
  // this to be a negligible cost since:
  // 1. Using the directExecutor implies that your callback is safe to run on any thread.
  // 2. This would likely only be noticeable if you were doing something expensive or blocking on
  //    a directExecutor listener on one of the output futures which is an antipattern anyway.
  SerializingExecutor executor = new SerializingExecutor(directExecutor());
  for (final ListenableFuture<? extends T> future : futures) {
    SettableFuture<T> delegate = SettableFuture.create();
    // Must make sure to add the delegate to the queue first in case the future is already done
    delegates.add(delegate);
    future.addListener(
        new Runnable() {
          @Override
          public void run() {
            delegates.remove().setFuture(future);
          }
        },
        executor);
    listBuilder.add(delegate);
  }
  return listBuilder.build();
}
 
源代码30 项目: quantumdb   文件: VersionIdGenerator.java
private Set<String> index() {
	Set<String> visited = Sets.newHashSet();
	Queue<Version> toVisit = Queues.newLinkedBlockingDeque();
	toVisit.add(rootVersion);

	while (!toVisit.isEmpty()) {
		Version current = toVisit.poll();
		if (visited.add(current.getId()) && current.getChild() != null) {
			toVisit.add(current.getChild());
		}
	}

	return visited;
}
 
 同包方法