类com.mongodb.client.model.DeleteOneModel源码实例Demo

下面列出了怎么用com.mongodb.client.model.DeleteOneModel的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mongo-kafka   文件: RdbmsDelete.java
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {

  BsonDocument keyDoc =
      doc.getKeyDoc()
          .orElseThrow(
              () -> new DataException("Error: key doc must not be missing for delete operation"));

  BsonDocument valueDoc =
      doc.getValueDoc()
          .orElseThrow(
              () ->
                  new DataException("Error: value doc must not be missing for delete operation"));

  try {
    BsonDocument filterDoc =
        RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
    return new DeleteOneModel<>(filterDoc);
  } catch (Exception exc) {
    throw new DataException(exc);
  }
}
 
源代码2 项目: mongo-kafka   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
void testValidSinkDocumentSingleFieldPK() {
  BsonDocument filterDoc = BsonDocument.parse("{_id: {id: 1004}}");
  BsonDocument keyDoc = BsonDocument.parse("{id: 1004}");
  BsonDocument valueDoc = BsonDocument.parse("{op: 'd'}");

  WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
  assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");

  DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(filterDoc, writeModel.getFilter());
}
 
源代码3 项目: mongo-kafka   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event with compound PK then correct DeleteOneModel")
void testValidSinkDocumentCompoundPK() {
  BsonDocument filterDoc = BsonDocument.parse("{_id: {idA: 123, idB: 'ABC'}}");
  BsonDocument keyDoc = BsonDocument.parse("{idA: 123, idB: 'ABC'}");
  BsonDocument valueDoc = BsonDocument.parse("{op: 'd'}");

  WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
  assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");

  DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(filterDoc, writeModel.getFilter());
}
 
源代码4 项目: mongo-kafka   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event without PK then correct DeleteOneModel")
void testValidSinkDocumentNoPK() {
  BsonDocument filterDoc = BsonDocument.parse("{text: 'misc', number: 9876, active: true}");
  BsonDocument keyDoc = new BsonDocument();
  BsonDocument valueDoc =
      BsonDocument.parse("{op: 'c', before: {text: 'misc', number: 9876, active: true}}");

  WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
  assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");

  DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(filterDoc, writeModel.getFilter());
}
 
源代码5 项目: mongo-kafka   文件: WriteModelStrategyTest.java
@Test
@DisplayName(
    "when sink document is valid for DeleteOneDefaultStrategy then correct DeleteOneModel")
void testDeleteOneDefaultStrategyWitValidSinkDocument() {

  BsonDocument keyDoc = BsonDocument.parse("{id: 1234}");

  WriteModel<BsonDocument> result =
      DELETE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(keyDoc, null));

  assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");

  DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;

  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");

  assertEquals(FILTER_DOC_DELETE_DEFAULT, writeModel.getFilter());
}
 
源代码6 项目: aion   文件: MongoDB.java
/**
 * Adds a new edit to the batch
 *
 * @param key the key to write
 * @param value the value to write. Null indicates we should delete this key
 * @return this
 */
public WriteBatch addEdit(byte[] key, byte[] value) {
    if (value == null) {
        DeleteOneModel deleteModel =
                new DeleteOneModel<>(eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)));

        edits.add(deleteModel);
    } else {
        UpdateOneModel updateModel =
                new UpdateOneModel<>(
                        eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)),
                        Updates.set(MongoConstants.VALUE_FIELD_NAME, new BsonBinary(value)),
                        new UpdateOptions().upsert(true));

        edits.add(updateModel);
    }

    return this;
}
 
源代码7 项目: quarkus   文件: CollectionManagementTest.java
@Test
void bulkWrite() {
    ReactiveMongoDatabase database = client.getDatabase(DATABASE);
    ReactiveMongoCollection<Document> collection = database.getCollection("test");

    BulkWriteResult result = collection.bulkWrite(Arrays.asList(
            new InsertOneModel<>(new Document("_id", 4)),
            new InsertOneModel<>(new Document("_id", 5)),
            new InsertOneModel<>(new Document("_id", 6)),
            new UpdateOneModel<>(new Document("_id", 1),
                    new Document("$set", new Document("x", 2))),
            new DeleteOneModel<>(new Document("_id", 2)),
            new ReplaceOneModel<>(new Document("_id", 3),
                    new Document("_id", 3).append("x", 4))))
            .await().indefinitely();

    assertThat(result.getDeletedCount()).isEqualTo(0);
    assertThat(result.getInsertedCount()).isEqualTo(3);

}
 
源代码8 项目: quarkus   文件: CollectionManagementTest.java
@Test
void bulkWriteWithOptions() {
    ReactiveMongoDatabase database = client.getDatabase(DATABASE);
    ReactiveMongoCollection<Document> collection = database.getCollection("test");

    BulkWriteResult result = collection.bulkWrite(Arrays.asList(
            new InsertOneModel<>(new Document("_id", 4)),
            new InsertOneModel<>(new Document("_id", 5)),
            new InsertOneModel<>(new Document("_id", 6)),
            new UpdateOneModel<>(new Document("_id", 1),
                    new Document("$set", new Document("x", 2))),
            new DeleteOneModel<>(new Document("_id", 2)),
            new ReplaceOneModel<>(new Document("_id", 3),
                    new Document("_id", 3).append("x", 4))),
            new BulkWriteOptions().ordered(true)).await().indefinitely();

    assertThat(result.getDeletedCount()).isEqualTo(0);
    assertThat(result.getInsertedCount()).isEqualTo(3);

}
 
源代码9 项目: kafka-connect-mongodb   文件: RdbmsDelete.java
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {

    BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
            () -> new DataException("error: key doc must not be missing for delete operation")
    );

    BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
            () -> new DataException("error: value doc must not be missing for delete operation")
    );

    try {
        BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
        return new DeleteOneModel<>(filterDoc);
    } catch(Exception exc) {
        throw new DataException(exc);
    }

}
 
源代码10 项目: kafka-connect-mongodb   文件: MongoDbDelete.java
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {

    BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
            () -> new DataException("error: key doc must not be missing for delete operation")
    );

    try {
        BsonDocument filterDoc = BsonDocument.parse(
                "{"+DBCollection.ID_FIELD_NAME+
                    ":"+keyDoc.getString(MongoDbHandler.JSON_ID_FIELD_PATH)
                            .getValue()+"}"
        );
        return new DeleteOneModel<>(filterDoc);
    } catch(Exception exc) {
        throw new DataException(exc);
    }

}
 
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {

    BsonDocument kd = document.getKeyDoc().orElseThrow(
            () -> new DataException("error: cannot build the WriteModel since"
                    + " the key document was missing unexpectedly")
    );

    //NOTE: fallback for backwards / deprecation compatibility
    if(idStrategy == null) {
        return kd.containsKey(DBCollection.ID_FIELD_NAME)
                ? new DeleteOneModel<>(kd)
                : new DeleteOneModel<>(new BsonDocument(DBCollection.ID_FIELD_NAME,kd));
    }

    //NOTE: current design doesn't allow to access original SinkRecord (= null)
    BsonValue _id = idStrategy.generateId(document,null);
    return new DeleteOneModel<>(
            new BsonDocument(DBCollection.ID_FIELD_NAME,_id)
    );

}
 
源代码12 项目: kafka-connect-mongodb   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
public void testValidSinkDocumentSingleFieldPK() {

    BsonDocument filterDoc =
            new BsonDocument(DBCollection.ID_FIELD_NAME,
                    new BsonDocument("id",new BsonInt32(1004)));

    BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
    BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));

    WriteModel<BsonDocument> result =
            RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));

    assertTrue(result instanceof DeleteOneModel,
            () -> "result expected to be of type DeleteOneModel");

    DeleteOneModel<BsonDocument> writeModel =
            (DeleteOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(filterDoc,writeModel.getFilter());

}
 
源代码13 项目: kafka-connect-mongodb   文件: MongoDbDeleteTest.java
@Test
@DisplayName("when valid cdc event then correct DeleteOneModel")
public void testValidSinkDocument() {
    BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));

    WriteModel<BsonDocument> result =
            MONGODB_DELETE.perform(new SinkDocument(keyDoc,null));

    assertTrue(result instanceof DeleteOneModel,
            () -> "result expected to be of type DeleteOneModel");

    DeleteOneModel<BsonDocument> writeModel =
            (DeleteOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC,writeModel.getFilter());

}
 
源代码14 项目: core-ng-project   文件: MongoCollectionImpl.java
@Override
public long bulkDelete(List<?> ids) {
    var watch = new StopWatch();
    int size = ids.size();
    int deletedRows = 0;
    try {
        List<DeleteOneModel<T>> models = new ArrayList<>(size);
        for (Object id : ids) {
            models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
        }
        BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
        deletedRows = result.getDeletedCount();
        return deletedRows;
    } finally {
        long elapsed = watch.elapsed();
        ActionLogContext.track("mongo", elapsed, 0, deletedRows);
        logger.debug("bulkDelete, collection={}, ids={}, size={}, deletedRows={}, elapsed={}", collectionName, ids, size, deletedRows, elapsed);
        checkSlowOperation(elapsed);
    }
}
 
源代码15 项目: mongo-kafka   文件: MongoDbDelete.java
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
  BsonDocument keyDoc =
      doc.getKeyDoc()
          .orElseThrow(
              () -> new DataException("Error: key doc must not be missing for delete operation"));

  try {
    return new DeleteOneModel<>(
        BsonDocument.parse(
            format("{%s: %s}", ID_FIELD, keyDoc.getString(JSON_ID_FIELD).getValue())));
  } catch (Exception exc) {
    throw new DataException(exc);
  }
}
 
源代码16 项目: mongo-kafka   文件: MongoDbDeleteTest.java
@Test
@DisplayName("when valid cdc event then correct DeleteOneModel")
void testValidSinkDocument() {
  BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
  WriteModel<BsonDocument> result = DELETE.perform(new SinkDocument(keyDoc, null));

  assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
  DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(FILTER_DOC, writeModel.getFilter());
}
 
源代码17 项目: mongo-kafka   文件: MongoSinkTaskTest.java
@Test
@DisplayName(
    "test DeleteOneDefaultStrategy with custom config and sink records with keys and null values")
void testBuildDeleteOneModelsCustomConfigSinkRecordsWithKeyAndNullValuePresent() {
  MongoSinkTask sinkTask = new MongoSinkTask();
  MongoSinkTopicConfig cfg =
      createTopicConfig(
          format(
              "{'%s': '%s', '%s': '%s', '%s': %s}",
              DOCUMENT_ID_STRATEGY_CONFIG,
              FullKeyStrategy.class.getName(),
              WRITEMODEL_STRATEGY_CONFIG,
              ReplaceOneDefaultStrategy.class.getName(),
              DELETE_ON_NULL_VALUES_CONFIG,
              "true"));

  TopicSettingsAndResults settings = new TopicSettingsAndResults(TEST_TOPIC, 10, 0);
  Schema keySchema = SchemaBuilder.struct().field("myKeyField", Schema.STRING_SCHEMA);
  settings.setKeySchema(keySchema);
  settings.setKey(new Struct(keySchema).put("myKeyField", "ABCD-1234"));

  List<SinkRecord> sinkRecordList = createSinkRecordList(settings);
  List<? extends WriteModel> writeModelList = sinkTask.buildWriteModel(cfg, sinkRecordList);

  assertNotNull(writeModelList, "WriteModel list was null");
  assertFalse(writeModelList.isEmpty(), "WriteModel list was empty");
  assertAll(
      "checking all generated WriteModel entries",
      writeModelList.stream()
          .map(
              wm ->
                  () -> {
                    assertTrue(wm instanceof DeleteOneModel);
                    DeleteOneModel<BsonDocument> dom = (DeleteOneModel<BsonDocument>) wm;
                    BsonDocument filter =
                        dom.getFilter().toBsonDocument(BsonDocument.class, null);
                    assertEquals(BsonDocument.parse("{_id: {myKeyField: 'ABCD-1234'}}"), filter);
                  }));
}
 
源代码18 项目: kafka-connect-mongodb   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event with compound PK then correct DeleteOneModel")
public void testValidSinkDocumentCompoundPK() {

    BsonDocument filterDoc =
            new BsonDocument(DBCollection.ID_FIELD_NAME,
                    new BsonDocument("idA",new BsonInt32(123))
                            .append("idB",new BsonString("ABC")));

    BsonDocument keyDoc = new BsonDocument("idA",new BsonInt32(123))
                                .append("idB",new BsonString("ABC"));
    BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));

    WriteModel<BsonDocument> result =
            RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));

    assertTrue(result instanceof DeleteOneModel,
            () -> "result expected to be of type DeleteOneModel");

    DeleteOneModel<BsonDocument> writeModel =
            (DeleteOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(filterDoc,writeModel.getFilter());

}
 
源代码19 项目: kafka-connect-mongodb   文件: RdbmsDeleteTest.java
@Test
@DisplayName("when valid cdc event without PK then correct DeleteOneModel")
public void testValidSinkDocumentNoPK() {

    BsonDocument filterDoc = new BsonDocument("text", new BsonString("hohoho"))
            .append("number", new BsonInt32(9876))
            .append("active", new BsonBoolean(true));

    BsonDocument keyDoc = new BsonDocument();

    BsonDocument valueDoc = new BsonDocument("op",new BsonString("c"))
            .append("before",new BsonDocument("text", new BsonString("hohoho"))
                    .append("number", new BsonInt32(9876))
                    .append("active", new BsonBoolean(true)));

    WriteModel<BsonDocument> result =
            RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));

    assertTrue(result instanceof DeleteOneModel,
            () -> "result expected to be of type DeleteOneModel");

    DeleteOneModel<BsonDocument> writeModel =
            (DeleteOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(filterDoc,writeModel.getFilter());

}
 
源代码20 项目: stitch-android-sdk   文件: DataSynchronizer.java
/**
 * Deletes a single synchronized document by its given id. No deletion will occur if the _id is
 * not being synchronized.
 *
 * @param nsConfig   the namespace synchronization config of the namespace where the document
 *                   lives.
 * @param documentId the _id of the document.
 */
@CheckReturnValue
private @Nullable
LocalSyncWriteModelContainer deleteOneFromResolution(
    final NamespaceSynchronizationConfig nsConfig,
    final BsonValue documentId,
    final BsonDocument atVersion
) {
  final MongoNamespace namespace = nsConfig.getNamespace();
  final ChangeEvent<BsonDocument> event;
  final Lock lock =
      this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
  lock.lock();
  final CoreDocumentSynchronizationConfig config;
  try {
    config = syncConfig.getSynchronizedDocument(namespace, documentId);
    if (config == null) {
      return null;
    }

    event = ChangeEvents.changeEventForLocalDelete(namespace, documentId, true);
    config.setSomePendingWrites(logicalT, atVersion, 0L, event);
  } finally {
    lock.unlock();
  }

  final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);

  container.addDocIDs(documentId);
  container.addLocalWrite(new DeleteOneModel<>(getDocumentIdFilter(documentId)));
  container.addLocalChangeEvent(event);
  container.addConfigWrite(
      new ReplaceOneModel<>(CoreDocumentSynchronizationConfig.getDocFilter(
        namespace, config.getDocumentId()
      ), config));

  return container;
}
 
源代码21 项目: mongo-kafka   文件: MongoSinkTaskTest.java
@Test
@DisplayName("test build WriteModelCDC for Rdbms Delete")
void testBuildWriteModelCdcForRdbmsDelete() {
  String topic = "dbserver1.catalogA.tableB";
  Schema keySchema = getRdbmsKeySchemaSample();
  Schema valueSchema = getRdbmsValueSchemaSample();
  List<SinkRecord> sinkRecords =
      IntStream.iterate(1234, i -> i + 1)
          .limit(5)
          .mapToObj(
              i ->
                  new SinkRecord(
                      topic,
                      0,
                      keySchema,
                      new Struct(keySchema).put("id", i),
                      valueSchema,
                      new Struct(valueSchema)
                          .put("op", "d")
                          .put(
                              "before",
                              new Struct(valueSchema.field("before").schema())
                                  .put("id", i)
                                  .put("first_name", "Alice" + i)
                                  .put("last_name", "in Wonderland")
                                  .put("email", "alice" + i + "@wonder.land"))
                          .put("after", null)
                          .put("source", "ignored"),
                      i - 1234))
          .collect(Collectors.toList());

  MongoSinkTask sinkTask = new MongoSinkTask();
  MongoSinkTopicConfig cfg =
      SinkTestHelper.createSinkConfig(
              format(
                  "{'%s': '%s', '%s': '%s'}",
                  TOPICS_CONFIG,
                  topic,
                  CHANGE_DATA_CAPTURE_HANDLER_CONFIG,
                  RdbmsHandler.class.getName()))
          .getMongoSinkTopicConfig(topic);
  List<? extends WriteModel> writeModels = sinkTask.buildWriteModelCDC(cfg, sinkRecords);
  assertNotNull(writeModels, "WriteModel list was null");
  assertFalse(writeModels.isEmpty(), "WriteModel list was empty");
  assertAll(
      "checking all generated WriteModel entries",
      IntStream.iterate(1234, i -> i + 1)
          .limit(5)
          .mapToObj(
              i ->
                  () -> {
                    int index = i - 1234;
                    WriteModel wm = writeModels.get(index);
                    assertNotNull(wm, "WriteModel at index " + index + " must not be null");
                    assertTrue(wm instanceof DeleteOneModel);

                    DeleteOneModel<BsonDocument> rom = (DeleteOneModel<BsonDocument>) wm;
                    BsonDocument filter =
                        rom.getFilter().toBsonDocument(BsonDocument.class, null);
                    assertEquals(BsonDocument.parse(format("{_id: {id: %s}}", i)), filter);
                  }));
}
 
@Test
@DisplayName("test DeleteOneDefaultStrategy with custom config and sink records with keys and null values")
void testBuildDeleteOneModelsCustomConfigSinkRecordsWithKeyAndNullValuePresent() {

    MongoDbSinkTask sinkTask = new MongoDbSinkTask();
    Map<String,String> props = new HashMap<>();
    props.put(MongoDbSinkConnectorConfig.MONGODB_DOCUMENT_ID_STRATEGY_CONF,FullKeyStrategy.class.getName());
    props.put(MongoDbSinkConnectorConfig.MONGODB_WRITEMODEL_STRATEGY,ReplaceOneDefaultStrategy.class.getName());
    props.put(MongoDbSinkConnectorConfig.MONGODB_DELETE_ON_NULL_VALUES,"true");
    props.put("topics","foo");
    props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTIONS_CONF,"foo-collection");
    props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF+"."+"foo","foo-collection");
    sinkTask.start(props);

    TopicSettingsAndResults settings = new TopicSettingsAndResults("foo","foo-collection",10,0);

    Schema ks = SchemaBuilder.struct()
            .field("myKeyField", Schema.STRING_SCHEMA);
    settings.setKeySchema(ks);
    settings.setKey(new Struct(ks)
            .put("myKeyField", "ABCD-1234"));

    List<SinkRecord> sinkRecordList = createSinkRecordList(settings);

    List<? extends WriteModel> writeModelList =
            sinkTask.buildWriteModel(sinkRecordList,"blah-collection");

    assertNotNull(writeModelList, "WriteModel list was null");

    assertFalse(writeModelList.isEmpty(), "WriteModel list was empty");

    assertAll("checking all generated WriteModel entries",
            writeModelList.stream().map(wm ->
                    () -> assertAll("assertions for single WriteModel",
                            () -> assertTrue(wm instanceof DeleteOneModel),
                            () -> {
                                DeleteOneModel<BsonDocument> dom = (DeleteOneModel<BsonDocument>)wm;
                                BsonDocument filter = dom.getFilter().toBsonDocument(BsonDocument.class,null);
                                assertEquals(new BsonDocument("_id",
                                                new BsonDocument("myKeyField",new BsonString("ABCD-1234"))),
                                        filter);
                            }
                    )
            )
    );

}
 
@Test
@DisplayName("test build WriteModelCDC for Rdbms Delete")
void testBuildWriteModelCdcForRdbmsDelete() {

    Schema keySchema = getRdbmsKeySchemaSample();
    Schema valueSchema = getRdbmsValueSchemaSample();
    List<SinkRecord> sinkRecords = IntStream.iterate(1234,i -> i+1).limit(5)
            .mapToObj(i -> new SinkRecord("test-topic",0,
                    keySchema,new Struct(keySchema)
                    .put("id",i),
                    valueSchema,new Struct(valueSchema)
                    .put("op","d")
                    .put("before", new Struct(valueSchema.field("before").schema())
                            .put("id",i)
                            .put("first_name","Alice"+i)
                            .put("last_name","in Wonderland")
                            .put("email","alice"+i+"@wonder.land"))
                    .put("after", null)
                    //.put("source",...) //NOTE: SKIPPED SINCE NOT USED AT ALL SO FAR
                    ,i - 1234
            ))
            .collect(Collectors.toList());

    MongoDbSinkTask sinkTask = new MongoDbSinkTask();
    Map<String,String> props = new HashMap<>();
    props.put("topics","dbserver1.catalogA.tableB");
    props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTIONS_CONF,"dbserver1.catalogA.tableB");
    props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF
            +"."+"dbserver1.catalogA.tableB","dbserver1.catalogA.tableB");
    props.put(MongoDbSinkConnectorConfig.MONGODB_CHANGE_DATA_CAPTURE_HANDLER
            +"."+"dbserver1.catalogA.tableB",RdbmsHandler.class.getName());
    sinkTask.start(props);

    List<? extends WriteModel> writeModels =
            sinkTask.buildWriteModelCDC(sinkRecords,"dbserver1.catalogA.tableB");

    assertNotNull(writeModels, "WriteModel list was null");

    assertFalse(writeModels.isEmpty(), "WriteModel list was empty");

    assertAll("checking all generated WriteModel entries",
            IntStream.iterate(1234,i -> i+1).limit(5).mapToObj(
                    i -> () -> {
                        int index = i-1234;
                        WriteModel wm = writeModels.get(index);
                        assertNotNull(wm, "WriteModel at index "+index+" must not be null");
                        assertTrue(wm instanceof DeleteOneModel);
                        DeleteOneModel<BsonDocument> rom = (DeleteOneModel<BsonDocument>)wm;
                        BsonDocument filter = rom.getFilter().toBsonDocument(BsonDocument.class,null);
                        assertEquals(new BsonDocument("_id",
                                        new BsonDocument("id",new BsonInt32(i))),
                                filter);
                    }
            )
    );

}
 
@Test
@DisplayName("when sink document is valid for DeleteOneDefaultStrategy then correct DeleteOneModel")
public void testDeleteOneDefaultStrategyWitValidSinkDocument() {

    BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));

    WriteModel<BsonDocument> result =
            DELETE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(keyDoc,null));

    assertTrue(result instanceof DeleteOneModel,
            () -> "result expected to be of type DeleteOneModel");

    DeleteOneModel<BsonDocument> writeModel =
            (DeleteOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC_DELETE_DEFAULT,writeModel.getFilter());

}
 
源代码25 项目: ditto   文件: MongoSearchUpdaterFlowTest.java
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {

    new TestKit(actorSystem) {{

        // GIVEN: The persistence fails with an error on every write

        final MongoDatabase db = Mockito.mock(MongoDatabase.class);
        final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
        final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
        Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
        Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
                .thenReturn(publisher);

        // GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink

        final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
                flow.start(1, 1, Duration.ZERO).to(Sink.ignore());

        final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
                RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);

        // WHEN: Many changes stream through MongoSearchUpdaterFlow

        final int numberOfChanges = 25;
        final CountDownLatch latch = new CountDownLatch(numberOfChanges);

        final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
        final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
        Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
        Source.repeat(Source.single(abstractWriteModel))
                .take(numberOfChanges)
                .buffer(1, OverflowStrategy.backpressure())
                .map(source -> {
                    latch.countDown();
                    return source;
                })
                .runWith(restartSink, ActorMaterializer.create(actorSystem));

        // THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream

        latch.await(5L, TimeUnit.SECONDS);
        assertThat(latch.getCount()).isZero();
    }};
}
 
源代码26 项目: aion   文件: MongoDB.java
/**
 * Gets the number of deletes which are in this batch
 *
 * @return Number of deletes
 */
public long getDeleteCount() {
    return this.edits.stream().filter(e -> e instanceof DeleteOneModel).count();
}
 
 类所在包
 类方法
 同包方法