下面列出了怎么用com.mongodb.client.model.changestream.FullDocument的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
@DisplayName("test fullDocument")
void testFullDocument() {
assertAll(
"fullDocument checks",
() -> assertEquals(Optional.empty(), createSourceConfig().getFullDocument()),
() ->
assertEquals(
Optional.empty(), createSourceConfig(FULL_DOCUMENT_CONFIG, "").getFullDocument()),
() ->
assertEquals(
Optional.of(FullDocument.DEFAULT),
createSourceConfig(FULL_DOCUMENT_CONFIG, FullDocument.DEFAULT.getValue())
.getFullDocument()),
() ->
assertEquals(
Optional.of(FullDocument.UPDATE_LOOKUP),
createSourceConfig(FULL_DOCUMENT_CONFIG, FullDocument.UPDATE_LOOKUP.getValue())
.getFullDocument()),
() -> assertInvalid(FULL_DOCUMENT_CONFIG, "madeUp"));
}
public Optional<FullDocument> getFullDocument() {
if (getBoolean(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG)) {
return Optional.of(FullDocument.UPDATE_LOOKUP);
} else {
return fullDocumentFromString(getString(FULL_DOCUMENT_CONFIG));
}
}
public static Optional<FullDocument> fullDocumentFromString(final String fullDocument) {
if (fullDocument.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(FullDocument.fromString(fullDocument));
}
}
@Test
void testThatWatchStreamCanBeConnected() {
String cs = "mongodb://localhost:27018,localhost:27019";
client = new ReactiveMongoClientImpl(MongoClients.create(cs));
List<Throwable> failures = new CopyOnWriteArrayList<>();
client.watch().onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Document.class).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Collections.emptyList()).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Collections.emptyList(), Document.class).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Collections.emptyList(), Document.class, null).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Collections.emptyList(), Document.class,
new ChangeStreamOptions().maxAwaitTime(1, TimeUnit.SECONDS)).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(new ChangeStreamOptions().fullDocument(FullDocument.DEFAULT))
.onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch((ChangeStreamOptions) null).onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
client.watch(Document.class)
.onFailure().invoke(failures::add)
.onItem().ignoreAsUni()
.subscribeAsCompletionStage();
assertThat(failures).isEmpty();
}
private void configQuery(MongoConnection connection, ConsumerRegistry registry, boolean updateLookUp) {
List<Bson> pipeline = singletonList(Aggregates.match(
Filters.and(
Filters.in(NS, getNamespaces(connection, registry)),
Filters.in("operationType", asList("insert", "delete", "replace", "update")))));
changeStreamDocuments = client.watch(pipeline).batchSize(MONGO_CHANGE_STREAM_BATCH_SIZE);
DocTimestamp docTimestamp = registry.votedMongoId(connection);
if (DocTimestamp.earliest == docTimestamp) {
MongoCursor<Document> firstLog = client.getDatabase(LOCAL).getCollection(OPLOG_RS).find(new Document()).limit(1).iterator();
if (firstLog.hasNext()) {
Document next = firstLog.next();
logger.info("Connect to earliest oplog time: {}", next.get(TS));
changeStreamDocuments.startAtOperationTime(((BsonTimestamp) next.get(TS)));
} else {
logger.info("Document not found in local.oplog.rs -- is this a new and empty db instance?");
changeStreamDocuments.startAtOperationTime(docTimestamp.getTimestamp());
}
} else {
/*
Optional. The starting point for the change stream.
If the specified starting point is in the past, it must be in the time range of the oplog.
To check the time range of the oplog, see rs.printReplicationInfo().
*/
changeStreamDocuments.startAtOperationTime(docTimestamp.getTimestamp());
}
// UPDATE_LOOKUP: return the most current majority-committed version of the updated document.
// i.e. run at different time, will have different fullDocument
changeStreamDocuments.fullDocument(updateLookUp ? FullDocument.UPDATE_LOOKUP : FullDocument.DEFAULT);
}
private <X> Publisher<X> watch(StandardOperations.Watch operation) {
final MongoCollection<X> collection = (MongoCollection<X>) this.collection;
final Bson filter = new Document("fullDocument", toBsonFilter(operation.query()));
return Flowable.fromPublisher(collection.watch(Collections.singletonList(filter))
.fullDocument(FullDocument.UPDATE_LOOKUP)
.withDocumentClass(collection.getDocumentClass()));
}
@Test
@DisplayName("test creates the expected collection cursor")
void testCreatesExpectedCollectionCursor() {
MongoSourceTask task = new MongoSourceTask();
Map<String, String> cfgMap = new HashMap<>();
cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
cfgMap.put(DATABASE_CONFIG, TEST_DATABASE);
cfgMap.put(COLLECTION_CONFIG, TEST_COLLECTION);
MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
when(mongoCollection.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
verify(mongoCollection, times(1)).watch();
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Pipeline
resetMocks();
cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
when(mongoCollection.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
verify(mongoCollection, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Complex
resetMocks();
cfgMap.put(BATCH_SIZE_CONFIG, "101");
FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
Collation collation =
Collation.builder()
.locale("en")
.caseLevel(true)
.collationCaseFirst(CollationCaseFirst.OFF)
.collationStrength(CollationStrength.IDENTICAL)
.collationAlternate(CollationAlternate.SHIFTED)
.collationMaxVariable(CollationMaxVariable.SPACE)
.numericOrdering(true)
.normalization(true)
.backwards(true)
.build();
cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());
cfg = new MongoSourceConfig(cfgMap);
task.initialize(context);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
when(mongoCollection.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
verify(mongoCollection, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).batchSize(101);
verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
verify(changeStreamIterable, times(1)).collation(collation);
verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
}
@Test
@DisplayName("test creates the expected database cursor")
void testCreatesExpectedDatabaseCursor() {
MongoSourceTask task = new MongoSourceTask();
Map<String, String> cfgMap = new HashMap<>();
cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
cfgMap.put(DATABASE_CONFIG, TEST_DATABASE);
MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).watch();
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Pipeline
resetMocks();
cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Complex
resetMocks();
cfgMap.put(BATCH_SIZE_CONFIG, "101");
FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
Collation collation =
Collation.builder()
.locale("en")
.caseLevel(true)
.collationCaseFirst(CollationCaseFirst.OFF)
.collationStrength(CollationStrength.IDENTICAL)
.collationAlternate(CollationAlternate.SHIFTED)
.collationMaxVariable(CollationMaxVariable.SPACE)
.numericOrdering(true)
.normalization(true)
.backwards(true)
.build();
cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());
cfg = new MongoSourceConfig(cfgMap);
task.initialize(context);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);
when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
when(mongoDatabase.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
verify(mongoDatabase, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).batchSize(101);
verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
verify(changeStreamIterable, times(1)).collation(collation);
verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
}
@Test
@DisplayName("test creates the expected client cursor")
void testCreatesExpectedClientCursor() {
MongoSourceTask task = new MongoSourceTask();
Map<String, String> cfgMap = new HashMap<>();
cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.watch()).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).watch();
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Pipeline
resetMocks();
cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
cfg = new MongoSourceConfig(cfgMap);
when(mongoClient.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
// Complex
resetMocks();
cfgMap.put(BATCH_SIZE_CONFIG, "101");
FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
Collation collation =
Collation.builder()
.locale("en")
.caseLevel(true)
.collationCaseFirst(CollationCaseFirst.OFF)
.collationStrength(CollationStrength.IDENTICAL)
.collationAlternate(CollationAlternate.SHIFTED)
.collationMaxVariable(CollationMaxVariable.SPACE)
.numericOrdering(true)
.normalization(true)
.backwards(true)
.build();
cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());
cfg = new MongoSourceConfig(cfgMap);
task.initialize(context);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);
when(mongoClient.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
when(mongoIterable.iterator()).thenReturn(mongoCursor);
task.createCursor(cfg, mongoClient);
verify(mongoClient, times(1)).watch(cfg.getPipeline().get());
verify(changeStreamIterable, times(1)).batchSize(101);
verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
verify(changeStreamIterable, times(1)).collation(collation);
verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
verify(mongoIterable, times(1)).iterator();
}