com.google.common.collect.Iterators#partition ( )源码实例Demo

下面列出了com.google.common.collect.Iterators#partition ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: neoscada   文件: AbstractEventQueryImpl.java
@Override
public synchronized void addListener ( final EventListener eventListener )
{
    this.listeners.add ( eventListener );

    final UnmodifiableIterator<List<Event>> it = Iterators.partition ( AbstractEventQueryImpl.this.events.iterator (), chunkSize );
    while ( it.hasNext () )
    {
        final List<org.eclipse.scada.ae.Event> chunk = it.next ();
        this.executor.execute ( new Runnable () {

            @Override
            public void run ()
            {
                eventListener.handleEvent ( chunk );
            }
        } );
    }

}
 
源代码2 项目: emodb   文件: PersistentSortedQueue.java
/** Move/copy segment records from a segment that isn't in segmentMap to the segments that are. */
private void moveRecords(Segment seg, ByteBuffer max, boolean deleteFromSource) {
    checkWritesAllowed();
    ByteBuffer from = seg.getMin();
    ByteBuffer toExclusive = successor(max);

    int batchSize = scanBatchSize();
    Iterator<List<ByteBuffer>> batchIter = Iterators.partition(
            _dao.scanRecords(seg.getDataId(), from, toExclusive, batchSize, Integer.MAX_VALUE),
            batchSize);
    while (batchIter.hasNext()) {
        List<ByteBuffer> records = batchIter.next();

        // Write to the destination.  Go through addAll() to update stats, do splitting, etc.
        addAll(records);

        // Delete individual records from the source.
        if (deleteFromSource) {
            _dao.prepareUpdate(_name).deleteRecords(seg.getDataId(), records).execute();
        }
    }

    seg.setMin(toExclusive);
}
 
源代码3 项目: emodb   文件: DefaultDedupEventStore.java
/**
 * Copies events matching the specified predicate from one dedup queue to another.
 * <p>
 * Note: this method expects both "from" and "to" are dedup queues.  If "from" queue is not, use
 * {@link #copyFromRawChannel} instead to avoid starting a DedupQueueService for "from" that will
 * drain it and move its data to a sorted queue.
 */
@Override
public void copy(String from, String to, Predicate<ByteBuffer> filter, Date since) {
    checkNotNull(from, "from");
    checkNotNull(to, "to");

    ScanSink sink = newCopySink(to);

    _delegate.scan(_channels.writeChannel(from), filter, sink, COPY_BATCH_SIZE, since);

    SortedQueue source = getQueueReadOnly(from, SERVICE_SLOW_WAIT_DURATION);
    Iterator<List<ByteBuffer>> it = Iterators.partition(source.scan(null, Long.MAX_VALUE), COPY_BATCH_SIZE);
    while (it.hasNext()) {
        List<ByteBuffer> events = it.next();
        sink.accept(ImmutableList.copyOf(Iterables.filter(events, filter)));  // Copy so filter is evaluated only once per record.
    }

    _delegate.scan(_channels.readChannel(from), filter, sink, COPY_BATCH_SIZE, since);
}
 
源代码4 项目: monsoon   文件: InfluxHistory.java
private UnmodifiableIterator<List<Map.Entry<DateTime, TimeSeriesValue>>> asBatchPointIteration_(@NonNull Iterator<? extends TimeSeriesCollection> i) {
    return Iterators.partition(
            Iterators.concat(
                    Iterators.transform(i, tsdata -> {
                        final DateTime timestamp = tsdata.getTimestamp();
                        LOG.log(Level.INFO, "Preparing TSData {0} for write", timestamp);
                        return Iterators.transform(tsdata.getTSValues().iterator(),
                                tsv -> SimpleMapEntry.create(timestamp, tsv));
                    })),
            targetBatchSize);
}
 
源代码5 项目: emodb   文件: PersistentSortedQueueTest.java
private void addBuffers(SortedQueue actual, Collection<ByteBuffer> expected, Collection<ByteBuffer> all,
                        int n, int batchSize, Iterator<ByteBuffer> generatorIter) {
    Iterator<List<ByteBuffer>> batchIter =
            Iterators.partition(Iterators.limit(generatorIter, n), batchSize);
    while (batchIter.hasNext()) {
        List<ByteBuffer> batch = batchIter.next();
        actual.addAll(batch);
        expected.addAll(batch);
        all.addAll(batch);
    }
}
 
源代码6 项目: molgenis   文件: EntityManagerImpl.java
@Override
public Stream<Entity> resolveReferences(
    EntityType entityType, Stream<Entity> entities, Fetch fetch) {
  // resolve lazy entity collections without references
  if (entities instanceof EntityStream && ((EntityStream) entities).isLazy()) {
    // TODO remove cast after updating DataService/Repository interfaces to return EntityStream
    return dataService.findAll(entityType.getId(), entities.map(Entity::getIdValue), fetch);
  }

  // no fetch exists that described what to resolve
  if (fetch == null) {
    return entities;
  }
  List<Attribute> resolvableAttrs = getResolvableAttrs(entityType, fetch);

  // entity has no references, nothing to resolve
  if (resolvableAttrs.isEmpty()) {
    return entities;
  }

  Iterable<List<Entity>> iterable = () -> Iterators.partition(entities.iterator(), BATCH_SIZE);
  return Streams.stream(iterable)
      .flatMap(
          batch -> {
            List<Entity> batchWithReferences = resolveReferences(resolvableAttrs, batch, fetch);
            return batchWithReferences.stream();
          });
}
 
源代码7 项目: molgenis   文件: L2CacheRepositoryDecorator.java
@SuppressWarnings("UnstableApiUsage")
@Override
public Stream<Entity> findAll(Stream<Object> ids) {
  if (doRetrieveFromCache()) {
    Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), ID_BATCH_SIZE);
    Iterable<List<Object>> iterable = () -> batches;
    return Streams.stream(iterable).flatMap(batch -> findAllCache(batch).stream());
  } else {
    return delegate().findAll(ids);
  }
}
 
源代码8 项目: molgenis   文件: L2CacheRepositoryDecorator.java
@SuppressWarnings("UnstableApiUsage")
@Override
public Stream<Entity> findAll(Stream<Object> ids, Fetch fetch) {
  if (doRetrieveFromCache()) {
    Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), ID_BATCH_SIZE);
    Iterable<List<Object>> iterable = () -> batches;
    return Streams.stream(iterable).flatMap(batch -> findAllCache(batch, fetch).stream());
  } else {
    return delegate().findAll(ids, fetch);
  }
}
 
源代码9 项目: neoscada   文件: EventPoolImpl.java
private void loadFromStorage ()
{
    // load initial set from storage, but restrict it to *daysToRetrieve* days
    try
    {
        final long t = System.currentTimeMillis ();
        // retrieve data per day, to restrict database load
        for ( int daysBack = 1; daysBack <= daysToRetrieve; daysBack++ )
        {
            final Calendar calStart = new GregorianCalendar ();
            final Calendar calEnd = new GregorianCalendar ();
            calStart.setTimeInMillis ( t );
            calStart.add ( Calendar.DAY_OF_YEAR, -daysBack );
            calEnd.setTimeInMillis ( t );
            calEnd.add ( Calendar.DAY_OF_YEAR, -daysBack + 1 );
            final StringBuilder filter = new StringBuilder ();
            filter.append ( "(&" );
            if ( this.filter != null )
            {
                filter.append ( this.filter );
            }
            filter.append ( "(sourceTimestamp>=" + isoDateFormat.format ( calStart.getTime () ) + ")" );
            if ( daysBack > 1 )
            {
                filter.append ( "(sourceTimestamp<" + isoDateFormat.format ( calEnd.getTime () ) + ")" );
            }
            filter.append ( ")" );
            logger.debug ( "load events from filter: " + filter );
            final Query query = this.storage.query ( filter.toString () );
            try
            {
                int count;
                synchronized ( this )
                {
                    count = this.events.getCapacity ();
                }

                final Collection<Event> result = query.getNext ( count );

                logger.debug ( "Loaded {} entries from storage", result.size () );
                synchronized ( this )
                {
                    this.events.addAll ( result );

                    final UnmodifiableIterator<List<Event>> it = Iterators.partition ( this.events.iterator (), chunkSize );
                    while ( it.hasNext () )
                    {
                        final List<org.eclipse.scada.ae.Event> chunk = it.next ();
                        notifyEvent ( chunk );
                    }
                }
            }
            finally
            {
                query.dispose ();
            }
            if ( this.events.size () >= this.events.getCapacity () )
            {
                return;
            }
        }
        logger.debug ( "load of events complete" );
    }
    catch ( final Exception e )
    {
        logger.error ( "loadFromStorage failed", e );
    }
}
 
源代码10 项目: more-lambdas-java   文件: MoreStreams.java
public static <T> Stream<List<T>> partition(Stream<T> stream, int size) {
    Iterable<List<T>> iterable = () -> Iterators.partition(stream.iterator(), size);
    return StreamSupport.stream(iterable.spliterator(), false);
}
 
源代码11 项目: molgenis   文件: PackagePersister.java
@Transactional
public void upsertPackages(Stream<Package> packages) {
  Iterator<List<Package>> partitions = Iterators.partition(packages.iterator(), BATCH_SIZE);
  partitions.forEachRemaining(this::upsertPackages);
}
 
源代码12 项目: molgenis   文件: AbstractRepository.java
@Override
public Stream<Entity> findAll(Stream<Object> ids, Fetch fetch) {
  Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), FIND_ALL_BATCH_SIZE);
  Iterable<List<Object>> iterable = () -> batches;
  return stream(iterable).flatMap(batch -> stream(findAllBatched(batch, fetch)));
}
 
源代码13 项目: ffwd   文件: KafkaPluginSink.java
/**
 * Convert the given message iterator to an iterator of batches of a specific size.
 * <p>
 * This is an attempt to reduce the required maximum amount of live memory required at a single
 * time.
 *
 * @param iterator Iterator to convert into batches.
 */
private Iterator<List<KeyedMessage<Integer, byte[]>>> toBatches(
    final Iterator<KeyedMessage<Integer, byte[]>> iterator
) {
    return Iterators.partition(iterator, batchSize);
}