下面列出了com.google.common.collect.Iterators#partition ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public synchronized void addListener ( final EventListener eventListener )
{
this.listeners.add ( eventListener );
final UnmodifiableIterator<List<Event>> it = Iterators.partition ( AbstractEventQueryImpl.this.events.iterator (), chunkSize );
while ( it.hasNext () )
{
final List<org.eclipse.scada.ae.Event> chunk = it.next ();
this.executor.execute ( new Runnable () {
@Override
public void run ()
{
eventListener.handleEvent ( chunk );
}
} );
}
}
/** Move/copy segment records from a segment that isn't in segmentMap to the segments that are. */
private void moveRecords(Segment seg, ByteBuffer max, boolean deleteFromSource) {
checkWritesAllowed();
ByteBuffer from = seg.getMin();
ByteBuffer toExclusive = successor(max);
int batchSize = scanBatchSize();
Iterator<List<ByteBuffer>> batchIter = Iterators.partition(
_dao.scanRecords(seg.getDataId(), from, toExclusive, batchSize, Integer.MAX_VALUE),
batchSize);
while (batchIter.hasNext()) {
List<ByteBuffer> records = batchIter.next();
// Write to the destination. Go through addAll() to update stats, do splitting, etc.
addAll(records);
// Delete individual records from the source.
if (deleteFromSource) {
_dao.prepareUpdate(_name).deleteRecords(seg.getDataId(), records).execute();
}
}
seg.setMin(toExclusive);
}
/**
* Copies events matching the specified predicate from one dedup queue to another.
* <p>
* Note: this method expects both "from" and "to" are dedup queues. If "from" queue is not, use
* {@link #copyFromRawChannel} instead to avoid starting a DedupQueueService for "from" that will
* drain it and move its data to a sorted queue.
*/
@Override
public void copy(String from, String to, Predicate<ByteBuffer> filter, Date since) {
checkNotNull(from, "from");
checkNotNull(to, "to");
ScanSink sink = newCopySink(to);
_delegate.scan(_channels.writeChannel(from), filter, sink, COPY_BATCH_SIZE, since);
SortedQueue source = getQueueReadOnly(from, SERVICE_SLOW_WAIT_DURATION);
Iterator<List<ByteBuffer>> it = Iterators.partition(source.scan(null, Long.MAX_VALUE), COPY_BATCH_SIZE);
while (it.hasNext()) {
List<ByteBuffer> events = it.next();
sink.accept(ImmutableList.copyOf(Iterables.filter(events, filter))); // Copy so filter is evaluated only once per record.
}
_delegate.scan(_channels.readChannel(from), filter, sink, COPY_BATCH_SIZE, since);
}
private UnmodifiableIterator<List<Map.Entry<DateTime, TimeSeriesValue>>> asBatchPointIteration_(@NonNull Iterator<? extends TimeSeriesCollection> i) {
return Iterators.partition(
Iterators.concat(
Iterators.transform(i, tsdata -> {
final DateTime timestamp = tsdata.getTimestamp();
LOG.log(Level.INFO, "Preparing TSData {0} for write", timestamp);
return Iterators.transform(tsdata.getTSValues().iterator(),
tsv -> SimpleMapEntry.create(timestamp, tsv));
})),
targetBatchSize);
}
private void addBuffers(SortedQueue actual, Collection<ByteBuffer> expected, Collection<ByteBuffer> all,
int n, int batchSize, Iterator<ByteBuffer> generatorIter) {
Iterator<List<ByteBuffer>> batchIter =
Iterators.partition(Iterators.limit(generatorIter, n), batchSize);
while (batchIter.hasNext()) {
List<ByteBuffer> batch = batchIter.next();
actual.addAll(batch);
expected.addAll(batch);
all.addAll(batch);
}
}
@Override
public Stream<Entity> resolveReferences(
EntityType entityType, Stream<Entity> entities, Fetch fetch) {
// resolve lazy entity collections without references
if (entities instanceof EntityStream && ((EntityStream) entities).isLazy()) {
// TODO remove cast after updating DataService/Repository interfaces to return EntityStream
return dataService.findAll(entityType.getId(), entities.map(Entity::getIdValue), fetch);
}
// no fetch exists that described what to resolve
if (fetch == null) {
return entities;
}
List<Attribute> resolvableAttrs = getResolvableAttrs(entityType, fetch);
// entity has no references, nothing to resolve
if (resolvableAttrs.isEmpty()) {
return entities;
}
Iterable<List<Entity>> iterable = () -> Iterators.partition(entities.iterator(), BATCH_SIZE);
return Streams.stream(iterable)
.flatMap(
batch -> {
List<Entity> batchWithReferences = resolveReferences(resolvableAttrs, batch, fetch);
return batchWithReferences.stream();
});
}
@SuppressWarnings("UnstableApiUsage")
@Override
public Stream<Entity> findAll(Stream<Object> ids) {
if (doRetrieveFromCache()) {
Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), ID_BATCH_SIZE);
Iterable<List<Object>> iterable = () -> batches;
return Streams.stream(iterable).flatMap(batch -> findAllCache(batch).stream());
} else {
return delegate().findAll(ids);
}
}
@SuppressWarnings("UnstableApiUsage")
@Override
public Stream<Entity> findAll(Stream<Object> ids, Fetch fetch) {
if (doRetrieveFromCache()) {
Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), ID_BATCH_SIZE);
Iterable<List<Object>> iterable = () -> batches;
return Streams.stream(iterable).flatMap(batch -> findAllCache(batch, fetch).stream());
} else {
return delegate().findAll(ids, fetch);
}
}
private void loadFromStorage ()
{
// load initial set from storage, but restrict it to *daysToRetrieve* days
try
{
final long t = System.currentTimeMillis ();
// retrieve data per day, to restrict database load
for ( int daysBack = 1; daysBack <= daysToRetrieve; daysBack++ )
{
final Calendar calStart = new GregorianCalendar ();
final Calendar calEnd = new GregorianCalendar ();
calStart.setTimeInMillis ( t );
calStart.add ( Calendar.DAY_OF_YEAR, -daysBack );
calEnd.setTimeInMillis ( t );
calEnd.add ( Calendar.DAY_OF_YEAR, -daysBack + 1 );
final StringBuilder filter = new StringBuilder ();
filter.append ( "(&" );
if ( this.filter != null )
{
filter.append ( this.filter );
}
filter.append ( "(sourceTimestamp>=" + isoDateFormat.format ( calStart.getTime () ) + ")" );
if ( daysBack > 1 )
{
filter.append ( "(sourceTimestamp<" + isoDateFormat.format ( calEnd.getTime () ) + ")" );
}
filter.append ( ")" );
logger.debug ( "load events from filter: " + filter );
final Query query = this.storage.query ( filter.toString () );
try
{
int count;
synchronized ( this )
{
count = this.events.getCapacity ();
}
final Collection<Event> result = query.getNext ( count );
logger.debug ( "Loaded {} entries from storage", result.size () );
synchronized ( this )
{
this.events.addAll ( result );
final UnmodifiableIterator<List<Event>> it = Iterators.partition ( this.events.iterator (), chunkSize );
while ( it.hasNext () )
{
final List<org.eclipse.scada.ae.Event> chunk = it.next ();
notifyEvent ( chunk );
}
}
}
finally
{
query.dispose ();
}
if ( this.events.size () >= this.events.getCapacity () )
{
return;
}
}
logger.debug ( "load of events complete" );
}
catch ( final Exception e )
{
logger.error ( "loadFromStorage failed", e );
}
}
public static <T> Stream<List<T>> partition(Stream<T> stream, int size) {
Iterable<List<T>> iterable = () -> Iterators.partition(stream.iterator(), size);
return StreamSupport.stream(iterable.spliterator(), false);
}
@Transactional
public void upsertPackages(Stream<Package> packages) {
Iterator<List<Package>> partitions = Iterators.partition(packages.iterator(), BATCH_SIZE);
partitions.forEachRemaining(this::upsertPackages);
}
@Override
public Stream<Entity> findAll(Stream<Object> ids, Fetch fetch) {
Iterator<List<Object>> batches = Iterators.partition(ids.iterator(), FIND_ALL_BATCH_SIZE);
Iterable<List<Object>> iterable = () -> batches;
return stream(iterable).flatMap(batch -> stream(findAllBatched(batch, fetch)));
}
/**
* Convert the given message iterator to an iterator of batches of a specific size.
* <p>
* This is an attempt to reduce the required maximum amount of live memory required at a single
* time.
*
* @param iterator Iterator to convert into batches.
*/
private Iterator<List<KeyedMessage<Integer, byte[]>>> toBatches(
final Iterator<KeyedMessage<Integer, byte[]>> iterator
) {
return Iterators.partition(iterator, batchSize);
}