类com.mongodb.client.model.changestream.FullDocument源码实例Demo

下面列出了怎么用com.mongodb.client.model.changestream.FullDocument的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mongo-kafka   文件: MongoSourceConfigTest.java
@Test
@DisplayName("test fullDocument")
void testFullDocument() {
  assertAll(
      "fullDocument checks",
      () -> assertEquals(Optional.empty(), createSourceConfig().getFullDocument()),
      () ->
          assertEquals(
              Optional.empty(), createSourceConfig(FULL_DOCUMENT_CONFIG, "").getFullDocument()),
      () ->
          assertEquals(
              Optional.of(FullDocument.DEFAULT),
              createSourceConfig(FULL_DOCUMENT_CONFIG, FullDocument.DEFAULT.getValue())
                  .getFullDocument()),
      () ->
          assertEquals(
              Optional.of(FullDocument.UPDATE_LOOKUP),
              createSourceConfig(FULL_DOCUMENT_CONFIG, FullDocument.UPDATE_LOOKUP.getValue())
                  .getFullDocument()),
      () -> assertInvalid(FULL_DOCUMENT_CONFIG, "madeUp"));
}
 
源代码2 项目: mongo-kafka   文件: MongoSourceConfig.java
public Optional<FullDocument> getFullDocument() {
  if (getBoolean(PUBLISH_FULL_DOCUMENT_ONLY_CONFIG)) {
    return Optional.of(FullDocument.UPDATE_LOOKUP);
  } else {
    return fullDocumentFromString(getString(FULL_DOCUMENT_CONFIG));
  }
}
 
源代码3 项目: mongo-kafka   文件: ConfigHelper.java
public static Optional<FullDocument> fullDocumentFromString(final String fullDocument) {
  if (fullDocument.isEmpty()) {
    return Optional.empty();
  } else {
    return Optional.of(FullDocument.fromString(fullDocument));
  }
}
 
源代码4 项目: quarkus   文件: ConnectionToReplicaSetTest.java
@Test
void testThatWatchStreamCanBeConnected() {
    String cs = "mongodb://localhost:27018,localhost:27019";
    client = new ReactiveMongoClientImpl(MongoClients.create(cs));
    List<Throwable> failures = new CopyOnWriteArrayList<>();
    client.watch().onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Document.class).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Collections.emptyList()).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Collections.emptyList(), Document.class).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Collections.emptyList(), Document.class, null).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Collections.emptyList(), Document.class,
            new ChangeStreamOptions().maxAwaitTime(1, TimeUnit.SECONDS)).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(new ChangeStreamOptions().fullDocument(FullDocument.DEFAULT))
            .onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch((ChangeStreamOptions) null).onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    client.watch(Document.class)
            .onFailure().invoke(failures::add)
            .onItem().ignoreAsUni()
            .subscribeAsCompletionStage();
    assertThat(failures).isEmpty();
}
 
源代码5 项目: syncer   文件: MongoV4MasterConnector.java
private void configQuery(MongoConnection connection, ConsumerRegistry registry, boolean updateLookUp) {
  List<Bson> pipeline = singletonList(Aggregates.match(
      Filters.and(
          Filters.in(NS, getNamespaces(connection, registry)),
          Filters.in("operationType", asList("insert", "delete", "replace", "update")))));
  changeStreamDocuments = client.watch(pipeline).batchSize(MONGO_CHANGE_STREAM_BATCH_SIZE);

  DocTimestamp docTimestamp = registry.votedMongoId(connection);
  if (DocTimestamp.earliest == docTimestamp) {
    MongoCursor<Document> firstLog = client.getDatabase(LOCAL).getCollection(OPLOG_RS).find(new Document()).limit(1).iterator();
    if (firstLog.hasNext()) {
      Document next = firstLog.next();
      logger.info("Connect to earliest oplog time: {}", next.get(TS));
      changeStreamDocuments.startAtOperationTime(((BsonTimestamp) next.get(TS)));
    } else {
      logger.info("Document not found in local.oplog.rs -- is this a new and empty db instance?");
      changeStreamDocuments.startAtOperationTime(docTimestamp.getTimestamp());
    }
  } else {
    /*
    Optional. The starting point for the change stream.
    If the specified starting point is in the past, it must be in the time range of the oplog.
    To check the time range of the oplog, see rs.printReplicationInfo().
     */
    changeStreamDocuments.startAtOperationTime(docTimestamp.getTimestamp());
  }
  // UPDATE_LOOKUP: return the most current majority-committed version of the updated document.
  // i.e. run at different time, will have different fullDocument
  changeStreamDocuments.fullDocument(updateLookUp ? FullDocument.UPDATE_LOOKUP : FullDocument.DEFAULT);
}
 
源代码6 项目: immutables   文件: MongoSession.java
private <X> Publisher<X> watch(StandardOperations.Watch operation) {
  final MongoCollection<X> collection = (MongoCollection<X>) this.collection;
  final Bson filter = new Document("fullDocument", toBsonFilter(operation.query()));
  return Flowable.fromPublisher(collection.watch(Collections.singletonList(filter))
          .fullDocument(FullDocument.UPDATE_LOOKUP)
          .withDocumentClass(collection.getDocumentClass()));

}
 
源代码7 项目: mongo-kafka   文件: MongoSourceTaskTest.java
@Test
@DisplayName("test creates the expected collection cursor")
void testCreatesExpectedCollectionCursor() {
  MongoSourceTask task = new MongoSourceTask();
  Map<String, String> cfgMap = new HashMap<>();
  cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
  cfgMap.put(DATABASE_CONFIG, TEST_DATABASE);
  cfgMap.put(COLLECTION_CONFIG, TEST_COLLECTION);
  MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
  when(mongoCollection.watch()).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
  verify(mongoCollection, times(1)).watch();
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Pipeline
  resetMocks();
  cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
  cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
  when(mongoCollection.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
  verify(mongoCollection, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Complex
  resetMocks();
  cfgMap.put(BATCH_SIZE_CONFIG, "101");

  FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
  cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
  Collation collation =
      Collation.builder()
          .locale("en")
          .caseLevel(true)
          .collationCaseFirst(CollationCaseFirst.OFF)
          .collationStrength(CollationStrength.IDENTICAL)
          .collationAlternate(CollationAlternate.SHIFTED)
          .collationMaxVariable(CollationMaxVariable.SPACE)
          .numericOrdering(true)
          .normalization(true)
          .backwards(true)
          .build();
  cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());

  cfg = new MongoSourceConfig(cfgMap);

  task.initialize(context);
  when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
  when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.getCollection(TEST_COLLECTION)).thenReturn(mongoCollection);
  when(mongoCollection.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).getCollection(TEST_COLLECTION);
  verify(mongoCollection, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).batchSize(101);
  verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
  verify(changeStreamIterable, times(1)).collation(collation);
  verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();
}
 
源代码8 项目: mongo-kafka   文件: MongoSourceTaskTest.java
@Test
@DisplayName("test creates the expected database cursor")
void testCreatesExpectedDatabaseCursor() {
  MongoSourceTask task = new MongoSourceTask();
  Map<String, String> cfgMap = new HashMap<>();
  cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
  cfgMap.put(DATABASE_CONFIG, TEST_DATABASE);
  MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.watch()).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).watch();
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Pipeline
  resetMocks();
  cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
  cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Complex
  resetMocks();
  cfgMap.put(BATCH_SIZE_CONFIG, "101");

  FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
  cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
  Collation collation =
      Collation.builder()
          .locale("en")
          .caseLevel(true)
          .collationCaseFirst(CollationCaseFirst.OFF)
          .collationStrength(CollationStrength.IDENTICAL)
          .collationAlternate(CollationAlternate.SHIFTED)
          .collationMaxVariable(CollationMaxVariable.SPACE)
          .numericOrdering(true)
          .normalization(true)
          .backwards(true)
          .build();
  cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());

  cfg = new MongoSourceConfig(cfgMap);

  task.initialize(context);
  when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
  when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);

  when(mongoClient.getDatabase(TEST_DATABASE)).thenReturn(mongoDatabase);
  when(mongoDatabase.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).getDatabase(TEST_DATABASE);
  verify(mongoDatabase, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).batchSize(101);
  verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
  verify(changeStreamIterable, times(1)).collation(collation);
  verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();
}
 
源代码9 项目: mongo-kafka   文件: MongoSourceTaskTest.java
@Test
@DisplayName("test creates the expected client cursor")
void testCreatesExpectedClientCursor() {
  MongoSourceTask task = new MongoSourceTask();
  Map<String, String> cfgMap = new HashMap<>();
  cfgMap.put(CONNECTION_URI_CONFIG, "mongodb://localhost");
  MongoSourceConfig cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.watch()).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).watch();
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Pipeline
  resetMocks();
  cfgMap.put(PIPELINE_CONFIG, "[{$match: {operationType: 'insert'}}]");
  cfg = new MongoSourceConfig(cfgMap);

  when(mongoClient.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();

  // Complex
  resetMocks();
  cfgMap.put(BATCH_SIZE_CONFIG, "101");

  FullDocument fullDocument = FullDocument.UPDATE_LOOKUP;
  cfgMap.put(FULL_DOCUMENT_CONFIG, fullDocument.getValue());
  Collation collation =
      Collation.builder()
          .locale("en")
          .caseLevel(true)
          .collationCaseFirst(CollationCaseFirst.OFF)
          .collationStrength(CollationStrength.IDENTICAL)
          .collationAlternate(CollationAlternate.SHIFTED)
          .collationMaxVariable(CollationMaxVariable.SPACE)
          .numericOrdering(true)
          .normalization(true)
          .backwards(true)
          .build();
  cfgMap.put(COLLATION_CONFIG, collation.asDocument().toJson());
  cfg = new MongoSourceConfig(cfgMap);

  task.initialize(context);
  when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
  when(offsetStorageReader.offset(task.createPartitionMap(cfg))).thenReturn(OFFSET);

  when(mongoClient.watch(cfg.getPipeline().get())).thenReturn(changeStreamIterable);
  when(changeStreamIterable.batchSize(101)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.fullDocument(fullDocument)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.collation(collation)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.startAfter(RESUME_TOKEN)).thenReturn(changeStreamIterable);
  when(changeStreamIterable.withDocumentClass(BsonDocument.class)).thenReturn(mongoIterable);
  when(mongoIterable.iterator()).thenReturn(mongoCursor);

  task.createCursor(cfg, mongoClient);

  verify(mongoClient, times(1)).watch(cfg.getPipeline().get());
  verify(changeStreamIterable, times(1)).batchSize(101);
  verify(changeStreamIterable, times(1)).fullDocument(fullDocument);
  verify(changeStreamIterable, times(1)).collation(collation);
  verify(changeStreamIterable, times(1)).startAfter(RESUME_TOKEN);
  verify(changeStreamIterable, times(1)).withDocumentClass(BsonDocument.class);
  verify(mongoIterable, times(1)).iterator();
}
 
 类所在包
 类方法
 同包方法