下面列出了怎么用com.mongodb.CursorType的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void createCollection() throws Exception {
MongoDatabase db = mongoClient.getDatabase(DATABASE);
testCollectionName = name.getMethodName();
db.createCollection(testCollectionName);
final long currentTime = System.currentTimeMillis();
//To make sure that oplog is read on each method after we created the above collection.
//We let this current second pass, before we get the initial timestamp seconds.
Awaitility.await().untilTrue(new AtomicBoolean((System.currentTimeMillis() - currentTime) > 1000));
//So we can skip old oplogs and just start with whatever this test is producing
initialTs = getInitialTsFromCurrentTime();
testDocuments = mongoClient.getDatabase(DATABASE).getCollection(testCollectionName);
mongoCursorFindIterable = mongoClient.getDatabase("local").getCollection(OPLOG_COLLECTION)
.find()
//As the collection is a capped collection we use Tailable cursor which will return results in natural order in this case
//based on ts timestamp field.
//Tailable Await does not return and blocks, so we are using tailable.
.cursorType(CursorType.Tailable);
}
private MongoCursor<Document> getReplicaCursor(MongoClient client, Document query) {
MongoDatabase db = client.getDatabase(LOCAL);
MongoCollection<Document> coll = db.getCollection(OPLOG_RS);
return coll.find(query)
.cursorType(CursorType.TailableAwait)
.oplogReplay(true)
.iterator();
}
private FindIterable<Document> find(int page){
final FindIterable<Document> documents = oplog
.find(query)
.sort(new Document("$natural", 1))
.skip(page * batchSize)
.limit(batchSize)
.projection(Projections.include("ts", "op", "ns", "o"))
.cursorType(CursorType.TailableAwait);
return documents;
}
private void prepareCursor(int timestampSeconds, int ordinal, List<OplogOpType> filterOplogTypes, int batchSize) {
LOG.debug("Getting new cursor with offset - TimeStampInSeconds:'{}', Ordinal : '{}' and Batch Size : '{}'",timestampSeconds, ordinal, batchSize);
FindIterable<Document> mongoCursorIterable = mongoCollection
.find()
//As the collection is a capped collection we use Tailable cursor which will return results in natural order in this case
//based on ts timestamp field.
//Tailable Await does not return and blocks, so we are using tailable.
.cursorType(CursorType.Tailable)
.batchSize(batchSize);
List<Bson> andFilters = new ArrayList<>();
//Only filter if we already have saved/initial offset specified or else both time_t and ordinal will not be -1.
if (timestampSeconds > 0 && ordinal >= 0) {
andFilters.add(Filters.gt(TIMESTAMP_FIELD, new BsonTimestamp(timestampSeconds, ordinal)));
}
if (!filterOplogTypes.isEmpty()) {
List<Bson> oplogOptypeFilters = new ArrayList<>();
Set<OplogOpType> oplogOpTypesSet = new HashSet<>();
for (OplogOpType filterOplogopType : filterOplogTypes) {
if (oplogOpTypesSet.add(filterOplogopType)) {
oplogOptypeFilters.add(Filters.eq(OP_TYPE_FIELD, filterOplogopType.getOp()));
}
}
//Add an or filter for filtered Or Types
andFilters.add(Filters.or(oplogOptypeFilters));
}
//Finally and timestamp with oplog filters
if (!andFilters.isEmpty()) {
mongoCursorIterable = mongoCursorIterable.filter(Filters.and(andFilters));
}
cursor = mongoCursorIterable.iterator();
}
private CursorType toCursorType(QueryOptions queryOptions) {
if (!queryOptions.isTailable()) {
return CursorType.NonTailable;
}
if (queryOptions.isAwaitData()) {
return CursorType.TailableAwait;
}
return CursorType.Tailable;
}
@Test
public void testTailableCursors() {
getMapper().map(CappedPic.class);
final Datastore ds = getDs();
ds.ensureCaps();
final Query<CappedPic> query = ds.find(CappedPic.class);
final List<CappedPic> found = new ArrayList<>();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
assertEquals(0, query.count());
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(
() -> ds.save(new CappedPic()), 0, 100, TimeUnit.MILLISECONDS);
Awaitility
.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> getDs().find(CappedPic.class).count() > 0);
final Iterator<CappedPic> tail = query.iterator(new FindOptions()
.cursorType(CursorType.Tailable));
Awaitility
.await()
.pollDelay(500, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
if (tail.hasNext()) {
found.add(tail.next());
}
return found.size() >= 10;
});
executorService.shutdownNow();
Assert.assertTrue(found.size() >= 10);
Assert.assertTrue(query.count() >= 10);
}
@Test
public void testTailableCursors() {
getMapper().map(CappedPic.class);
final Datastore ds = getDs();
ds.ensureCaps();
final Query<CappedPic> query = ds.find(CappedPic.class);
final List<CappedPic> found = new ArrayList<>();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
assertEquals(0, query.count());
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(
() -> ds.save(new CappedPic()), 0, 100, TimeUnit.MILLISECONDS);
Awaitility
.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> getDs().find(CappedPic.class).count() > 0);
final Iterator<CappedPic> tail = query.execute(new FindOptions()
.cursorType(CursorType.Tailable));
Awaitility
.await()
.pollDelay(500, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
if (tail.hasNext()) {
found.add(tail.next());
}
return found.size() >= 10;
});
executorService.shutdownNow();
Assert.assertTrue(found.size() >= 10);
Assert.assertTrue(query.count() >= 10);
}
/**
* @return the cursor type
*/
public CursorType getCursorType() {
return this.cursorType;
}