下面列出了com.google.common.collect.Iterators#advance ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void display(Audience audience, Collection<? extends T> results, int page) {
if(results.isEmpty()) {
audience.sendMessage(new WarningComponent("command.error.emptyResult"));
return;
}
final int pages = Numbers.divideRoundingUp(results.size(), perPage);
if(page < 1 || page > pages) {
audience.sendMessage(new WarningComponent("command.error.invalidPage", String.valueOf(page), String.valueOf(pages)));
return;
}
final int start = perPage * (page - 1);
final int end = Math.min(perPage * page, results.size());
audience.sendMessage(header(page, pages));
if(results instanceof List) {
List<? extends T> list = (List<? extends T>) results;
for (int index = start; index < end; index++) {
audience.sendMessages(multiEntry(list.get(index), index));
}
} else {
final Iterator<? extends T> iterator = results.iterator();
for(int index = Iterators.advance(iterator, start); index < end; index++) {
audience.sendMessages(multiEntry(iterator.next(), index));
}
}
}
/**
* Lazily advance the passed Iterator the given number of times. A lazy variant of
* {@link com.google.common.collect.Iterators#advance}
*
* @return A new Iterator object representing the given Iterator advanced a specified
* number of times, calling the given Iterator's next() method when needed.
*/
public static <T> Iterator<T> advanced(Iterator<T> backingIterator, int numberToAdvance) {
return new Iterator<T>() {
private boolean advanced;
@Override
public boolean hasNext() {
lazilyAdvance();
return backingIterator.hasNext();
}
@Override
public T next() {
lazilyAdvance();
return backingIterator.next();
}
@Override
public void remove() {
lazilyAdvance();
backingIterator.remove();
}
private void lazilyAdvance() {
if (!advanced) {
Iterators.advance(backingIterator, numberToAdvance);
advanced = true;
}
}
};
}
@Override
public Iterator<? extends T> iterator(long first, long count) {
Collection<T> data =dataModel.getObject();
if(data==null || data.size()==0) return Collections.emptyIterator();
if(filterPredicate!=null) data = Collections2.filter(data, filterPredicate);
Iterator<T> it;
final SortParam<S> sortParam = getSort();
if(sortParam!=null && sortParam.getProperty()!=null)
{
Ordering<T> ordering = Ordering.natural().nullsFirst().onResultOf(new Function<T, Comparable<?>>() {
@Override
public Comparable<?> apply(T input) {
return comparableValue(input, sortParam.getProperty());
}
});
if(!sortParam.isAscending()) ordering = ordering.reverse();
it=ordering.sortedCopy(data).iterator();
}
else
{
it=data.iterator();
}
if(filterPredicate!=null) it = Iterators.filter(it, filterPredicate);
if(first>0) Iterators.advance(it, (int)first);
return count>=0?Iterators.limit(it, (int)count):it;
}
static ByteBuffer getListItem(Iterator<Cell> iter, int index) throws InvalidRequestException
{
int adv = Iterators.advance(iter, index);
if (adv == index && iter.hasNext())
return iter.next().value();
else
return null;
}
@Test
public void testDirectoryAccessAndModifiedTimeUpdates() throws IOException {
Files.createDirectories(path("/foo/bar"));
FileTimeTester tester = new FileTimeTester(path("/foo/bar"));
tester.assertAccessTimeDidNotChange();
tester.assertModifiedTimeDidNotChange();
// TODO(cgdecker): Use a Clock for file times so I can test this reliably without sleeping
Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
Files.createFile(path("/foo/bar/baz.txt"));
tester.assertAccessTimeDidNotChange();
tester.assertModifiedTimeChanged();
Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
// access time is updated by reading the full contents of the directory
// not just by doing a lookup in it
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path("/foo/bar"))) {
// iterate the stream, forcing the directory to actually be read
Iterators.advance(stream.iterator(), Integer.MAX_VALUE);
}
tester.assertAccessTimeChanged();
tester.assertModifiedTimeDidNotChange();
Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
Files.move(path("/foo/bar/baz.txt"), path("/foo/bar/baz2.txt"));
tester.assertAccessTimeDidNotChange();
tester.assertModifiedTimeChanged();
Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
Files.delete(path("/foo/bar/baz2.txt"));
tester.assertAccessTimeDidNotChange();
tester.assertModifiedTimeChanged();
}
@Test
public void testPartialCompactionWithNoRedundancy() throws Exception {
InMemoryDataReaderDAO dataDao = new InMemoryDataReaderDAO();
InMemoryTableDAO tableDao = new InMemoryTableDAO();
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.<URI>absent(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.<String, Object>emptyMap(), newAudit("create table"));
Table table = tableDao.get(TABLE);
// Set the full consistency timestamp before the first delta
dataDao.setFullConsistencyTimestamp(1408977300000L);
// Create an update where there are no redundant deltas
DeltaClusteringKey unique0 = new DeltaClusteringKey(TimeUUIDs.uuidForTimeMillis(1408977310000L), 1);
DeltaClusteringKey unique1 = new DeltaClusteringKey(TimeUUIDs.uuidForTimeMillis(1408977320000L), 1);
DeltaClusteringKey unique2 = new DeltaClusteringKey(TimeUUIDs.uuidForTimeMillis(1408977330000L), 1);
DeltaClusteringKey unique3 = new DeltaClusteringKey(TimeUUIDs.uuidForTimeMillis(1408977340000L), 1);
store.update(TABLE, KEY, unique0.getChangeId(), Deltas.fromString("{\"name\":\"Bob\"}"), newAudit("submit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, unique1.getChangeId(), Deltas.fromString("{\"name\":\"Carol\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, unique2.getChangeId(), Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, unique3.getChangeId(), Deltas.fromString("{\"name\":\"Alice\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
// Set the full consistency timestamp such that no compaction will take place
dataDao.setFullConsistencyTimestamp(1408977300000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
Record record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
assertFalse(record.passOneIterator().hasNext());
assertEquals(ImmutableList.of(unique0, unique1, unique2, unique3), toClusteringKeys(record.passTwoIterator()));
// Set the full consistency timestamp so that only the first records are compacted
dataDao.setFullConsistencyTimestamp(1408977325000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
Map.Entry<DeltaClusteringKey, Compaction> compactionEntry = Iterators.getOnlyElement(record.passOneIterator());
Compaction compaction = compactionEntry.getValue();
assertEquals(unique0.getChangeId(), compaction.getFirst());
assertEquals(unique1.getChangeId(), compaction.getCutoff());
assertEquals(unique1.getChangeId(), compaction.getLastMutation());
// Deltas will not get deleted since compaction is still out of FCT. For this test, we don't need the deltas to be deleted.
assertEquals(toClusteringKeys(record.passTwoIterator()), ImmutableList.of(unique0, unique1, unique2, unique3, compactionEntry.getKey()));
// Repeat again such that all deltas are compacted
dataDao.setFullConsistencyTimestamp(TimeUUIDs.getTimeMillis(TimeUUIDs.getNext(compactionEntry.getKey().getChangeId())) + 2000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
// We still keep the last compaction around since the new owning compaction will be out of FCT.
int numOfCompactions = Iterators.advance(record.passOneIterator(), 10);
assertEquals(numOfCompactions, 2, "Expect 2 compactions. The more recent is the effective one, " +
"but we defer the owned compaction until later");
DeltaClusteringKey oldCompactionKey = compactionEntry.getKey();
record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
Map.Entry<DeltaClusteringKey, Compaction> latestCompactionEntry = Iterators.getOnlyElement(
Iterators.filter(record.passOneIterator(), input -> !input.getKey().equals(oldCompactionKey)));
compaction = latestCompactionEntry.getValue();
assertEquals(unique0.getChangeId(), compaction.getFirst());
assertEquals(unique3.getChangeId(), compaction.getCutoff());
assertEquals(unique3.getChangeId(), compaction.getLastMutation());
assertEquals(toClusteringKeys(record.passTwoIterator()), ImmutableList.of(unique2, unique3, oldCompactionKey, latestCompactionEntry.getKey()),
"Expecting unique2, and unique3 deltas");
}
@Test
public void testPartialCompactionWithRedundancy() throws Exception {
InMemoryDataReaderDAO dataDao = new InMemoryDataReaderDAO();
InMemoryTableDAO tableDao = new InMemoryTableDAO();
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.<URI>absent(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.<String, Object>emptyMap(), newAudit("create table"));
Table table = tableDao.get(TABLE);
// Set the full consistency timestamp before the first delta
dataDao.setFullConsistencyTimestamp(1408977300000L);
// Create an update where the last four updates are redundant
UUID unique0 = TimeUUIDs.uuidForTimeMillis(1408977310000L);
UUID unique1 = TimeUUIDs.uuidForTimeMillis(1408977320000L);
UUID redund0 = TimeUUIDs.uuidForTimeMillis(1408977330000L);
UUID redund1 = TimeUUIDs.uuidForTimeMillis(1408977340000L);
UUID redund2 = TimeUUIDs.uuidForTimeMillis(1408977350000L);
UUID redund3 = TimeUUIDs.uuidForTimeMillis(1408977360000L);
store.update(TABLE, KEY, unique0, Deltas.fromString("{\"name\":\"Bob\"}"), newAudit("submit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, unique1, Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, redund0, Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, redund1, Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, redund2, Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
store.update(TABLE, KEY, redund3, Deltas.fromString("{\"name\":\"Ted\"}"), newAudit("resubmit"), WriteConsistency.STRONG);
// Set the full consistency timestamp such that no compaction will take place
dataDao.setFullConsistencyTimestamp(1408977300000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
Record record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
assertFalse(record.passOneIterator().hasNext());
assertEquals(ImmutableList.of(unique0, unique1, redund0, redund1, redund2, redund3), toChangeIds(record.passTwoIterator()));
// Set the full consistency timestamp so that only the first two redundant records are compacted
dataDao.setFullConsistencyTimestamp(1408977345000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
Map.Entry<DeltaClusteringKey, Compaction> compactionEntry = Iterators.getOnlyElement(record.passOneIterator());
Compaction compaction = compactionEntry.getValue();
assertEquals(unique0, compaction.getFirst());
assertEquals(redund1, compaction.getCutoff());
assertEquals(unique1, compaction.getLastMutation());
assertEquals(ImmutableList.of(unique0, unique1, redund0, redund1, redund2, redund3, compactionEntry.getKey().getChangeId()), toChangeIds(record.passTwoIterator()));
assertRedundantDelta(store, TABLE, KEY, redund0);
assertRedundantDelta(store, TABLE, KEY, redund1);
assertRedundantDelta(store, TABLE, KEY, redund2);
assertRedundantDelta(store, TABLE, KEY, redund3);
// Repeat again such that all deltas are compacted
dataDao.setFullConsistencyTimestamp(TimeUUIDs.getTimeMillis(TimeUUIDs.getNext(compactionEntry.getKey().getChangeId())) + 2000L);
store.compact(TABLE, KEY, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
record = dataDao.read(new Key(table, KEY), ReadConsistency.STRONG);
// We still keep the last compaction around since the new owning compaction will be out of FCT.
int numOfCompactions = Iterators.advance(record.passOneIterator(), 10);
assertEquals(numOfCompactions, 2, "Expect 2 compactions. The more recent is the effective one, " +
"but we defer the owned compaction until later");
UUID oldCompactionKey = compactionEntry.getKey().getChangeId();
Map.Entry<DeltaClusteringKey, Compaction> latestCompactionEntry = Iterators.getOnlyElement(
Iterators.filter(record.passOneIterator(), input -> !input.getKey().getChangeId().equals(oldCompactionKey)));
compaction = latestCompactionEntry.getValue();
assertEquals(unique0, compaction.getFirst());
assertEquals(redund3, compaction.getCutoff());
assertEquals(unique1, compaction.getLastMutation());
assertEquals(ImmutableList.of(redund2, redund3, oldCompactionKey, latestCompactionEntry.getKey().getChangeId()), toChangeIds(record.passTwoIterator()));
assertRedundantDelta(store, TABLE, KEY, redund0);
assertRedundantDelta(store, TABLE, KEY, redund1);
assertRedundantDelta(store, TABLE, KEY, redund2);
assertRedundantDelta(store, TABLE, KEY, redund3);
}
/**
* If a row gets compacted after a long time, then it is possible that we will try to archive a lot of deltas.
* In this scenario, it is possible that the collective delta history we will be trying to write out to Cassandra
* will exceed the thrift limit. We should give up on archiving delta history in that scenario.
*/
@Test
public void testDeltaHistoryDisabledIfTooLarge() {
// Add a delta that is 1MB, which is small enough to pass the delta size test.
// Then, add very small deltas 10 times, which means it will be over 10 MB thrift limit by the time of compaction,
// and delta history should be disabled
// Setup
MetricRegistry metricRegistry = new MetricRegistry();
MultiDCDataStores allDCs = new MultiDCDataStores(1, metricRegistry);
DataStore dc1 = allDCs.dc(0);
InMemoryDataReaderDAO dao1 = allDCs.dao(0);
// Reset the archivedDeltaSize static counter to zero - This may have accrued some value due to our use of
// DiscardingExecutorService in other tests.
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
dc1.createTable(TABLE, options, Collections.<String, Object>emptyMap(), newAudit("create table"));
long bufferSize = 1 * 1024L * 1024L; // 1 MB
StringBuilder sb = new StringBuilder((int) bufferSize);
for (int i=0; i < bufferSize; i++) {
sb.append("a");
}
dc1.update(TABLE, KEY2, TimeUUIDs.newUUID(), Deltas.fromString(format("{..,\"bigtextfield\":\"%s\"}", sb.toString())),
newAudit("moderate"), WriteConsistency.STRONG);
// Now we simply add 10 small deltas
for (int i = 0; i < 11; i++) {
dc1.update(TABLE, KEY2, TimeUUIDs.newUUID(), Deltas.fromString("{..,\"newcodes\":\"ss\"}"),
newAudit("moderate"), WriteConsistency.STRONG);
}
// Update FullConsistency time again
dao1.setFullConsistencyTimestamp(SystemClock.tick());
SystemClock.tick();
// Now make sure that there are no audited deltas since the overall content would be 10MB or more
// Compaction will occur, but due to transport size limit, it should not save the history
dc1.compact(TABLE, KEY2, null, ReadConsistency.STRONG, WriteConsistency.STRONG);
int countOfDeltasAfterHugeDelta = Iterators.advance(allDCs.historyStore(0).getDeltaHistories(TABLE, KEY2), Integer.MAX_VALUE);
// Since we know that our first delta was 1 MB, the next 10 deltas each will have the content size of at least 1 MB
// for a total of 10 MB. This is because we keep a "snapshot" of row with each delta archive.
// Verify that we have not saved any delta history for this compaction.
assertEquals(countOfDeltasAfterHugeDelta, 0,
"There should be no audited deltas since, the total size of delta history has crossed thrift limit");
// Make sure that the archived delta size goes back to 0
assertEquals(((DefaultDataStore)dc1)._archiveDeltaSize.getCount(), 0L, "The archived delta counter should be back to 0");
}