类com.mongodb.client.model.UpdateOneModel源码实例Demo

下面列出了怎么用com.mongodb.client.model.UpdateOneModel的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mongo-kafka   文件: UpdateOneTimestampsStrategy.java
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
  BsonDocument vd =
      document
          .getValueDoc()
          .orElseThrow(
              () ->
                  new DataException(
                      "Error: cannot build the WriteModel since the value document was missing unexpectedly"));

  BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());

  return new UpdateOneModel<>(
      new BsonDocument(ID_FIELD, vd.get(ID_FIELD)),
      new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
          .append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
      UPDATE_OPTIONS);
}
 
源代码2 项目: mongo-kafka   文件: MongoDbUpdateTest.java
@Test
@DisplayName("when valid doc change cdc event then correct UpdateOneModel")
void testValidSinkDocumentForUpdate() {
  BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
  BsonDocument valueDoc =
      new BsonDocument("op", new BsonString("u"))
          .append("patch", new BsonString(UPDATE_DOC.toJson()));

  WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
  assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");

  UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
  assertEquals(UPDATE_DOC, writeModel.getUpdate(), "update doc not matching what is expected");
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(FILTER_DOC, writeModel.getFilter());
}
 
源代码3 项目: mongo-kafka   文件: MongoDbUpdateTest.java
@Test
@DisplayName(
    "when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
  BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
  BsonDocument valueDoc =
      new BsonDocument("op", new BsonString("u"))
          .append("patch", new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));

  WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
  assertTrue(
      result instanceof UpdateOneModel, () -> "result expected to be of type UpdateOneModel");

  UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
  assertEquals(
      UPDATE_DOC, writeModel.getUpdate(), () -> "update doc not matching what is expected");
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      () -> "filter expected to be of type BsonDocument");
  assertEquals(FILTER_DOC, writeModel.getFilter());
}
 
源代码4 项目: aion   文件: MongoDB.java
/**
 * Adds a new edit to the batch
 *
 * @param key the key to write
 * @param value the value to write. Null indicates we should delete this key
 * @return this
 */
public WriteBatch addEdit(byte[] key, byte[] value) {
    if (value == null) {
        DeleteOneModel deleteModel =
                new DeleteOneModel<>(eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)));

        edits.add(deleteModel);
    } else {
        UpdateOneModel updateModel =
                new UpdateOneModel<>(
                        eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)),
                        Updates.set(MongoConstants.VALUE_FIELD_NAME, new BsonBinary(value)),
                        new UpdateOptions().upsert(true));

        edits.add(updateModel);
    }

    return this;
}
 
源代码5 项目: quarkus   文件: CollectionManagementTest.java
@Test
void bulkWrite() {
    ReactiveMongoDatabase database = client.getDatabase(DATABASE);
    ReactiveMongoCollection<Document> collection = database.getCollection("test");

    BulkWriteResult result = collection.bulkWrite(Arrays.asList(
            new InsertOneModel<>(new Document("_id", 4)),
            new InsertOneModel<>(new Document("_id", 5)),
            new InsertOneModel<>(new Document("_id", 6)),
            new UpdateOneModel<>(new Document("_id", 1),
                    new Document("$set", new Document("x", 2))),
            new DeleteOneModel<>(new Document("_id", 2)),
            new ReplaceOneModel<>(new Document("_id", 3),
                    new Document("_id", 3).append("x", 4))))
            .await().indefinitely();

    assertThat(result.getDeletedCount()).isEqualTo(0);
    assertThat(result.getInsertedCount()).isEqualTo(3);

}
 
源代码6 项目: quarkus   文件: CollectionManagementTest.java
@Test
void bulkWriteWithOptions() {
    ReactiveMongoDatabase database = client.getDatabase(DATABASE);
    ReactiveMongoCollection<Document> collection = database.getCollection("test");

    BulkWriteResult result = collection.bulkWrite(Arrays.asList(
            new InsertOneModel<>(new Document("_id", 4)),
            new InsertOneModel<>(new Document("_id", 5)),
            new InsertOneModel<>(new Document("_id", 6)),
            new UpdateOneModel<>(new Document("_id", 1),
                    new Document("$set", new Document("x", 2))),
            new DeleteOneModel<>(new Document("_id", 2)),
            new ReplaceOneModel<>(new Document("_id", 3),
                    new Document("_id", 3).append("x", 4))),
            new BulkWriteOptions().ordered(true)).await().indefinitely();

    assertThat(result.getDeletedCount()).isEqualTo(0);
    assertThat(result.getInsertedCount()).isEqualTo(3);

}
 
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {

    BsonDocument vd = document.getValueDoc().orElseThrow(
            () -> new DataException("error: cannot build the WriteModel since"
                    + " the value document was missing unexpectedly")
    );

    BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());

    return new UpdateOneModel<>(
            new BsonDocument(DBCollection.ID_FIELD_NAME,
                    vd.get(DBCollection.ID_FIELD_NAME)),
            new BsonDocument("$set", vd.append(FIELDNAME_MODIFIED_TS, dateTime))
                    .append("$setOnInsert", new BsonDocument(FIELDNAME_INSERTED_TS, dateTime)),
            UPDATE_OPTIONS
    );

}
 
源代码8 项目: kafka-connect-mongodb   文件: MongoDbUpdateTest.java
@Test
@DisplayName("when valid doc change cdc event then correct UpdateOneModel")
public void testValidSinkDocumentForUpdate() {
    BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));

    BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
            .append("patch",new BsonString(UPDATE_DOC.toJson()));

    WriteModel<BsonDocument> result =
            MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));

    assertTrue(result instanceof UpdateOneModel,
            () -> "result expected to be of type UpdateOneModel");

    UpdateOneModel<BsonDocument> writeModel =
            (UpdateOneModel<BsonDocument>) result;

    assertEquals(UPDATE_DOC,writeModel.getUpdate(),
            ()-> "update doc not matching what is expected");

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC,writeModel.getFilter());

}
 
源代码9 项目: kafka-connect-mongodb   文件: MongoDbUpdateTest.java
@Test
@DisplayName("when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
    BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));

    BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
            .append("patch",new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));

    WriteModel<BsonDocument> result =
            MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));

    assertTrue(result instanceof UpdateOneModel,
            () -> "result expected to be of type UpdateOneModel");

    UpdateOneModel<BsonDocument> writeModel =
            (UpdateOneModel<BsonDocument>) result;

    assertEquals(UPDATE_DOC,writeModel.getUpdate(),
            ()-> "update doc not matching what is expected");

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC,writeModel.getFilter());

}
 
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
  BsonDocument vd =
      document
          .getValueDoc()
          .orElseThrow(
              () ->
                  new DataException(
                      "Error: cannot build the WriteModel since the value document was missing unexpectedly"));

  BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());

  try {
    BsonDocument businessKey = vd.getDocument(ID_FIELD);
    vd.remove(ID_FIELD);

    return new UpdateOneModel<>(
        businessKey,
        new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
            .append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
        UPDATE_OPTIONS);

  } catch (BSONException e) {
    throw new DataException(
        "Error: cannot build the WriteModel since the value document does not contain an _id field of"
            + " type BsonDocument which holds the business key fields");
  }
}
 
源代码11 项目: mongo-kafka   文件: WriteModelStrategyTest.java
@Test
@DisplayName(
    "when sink document is valid for UpdateOneTimestampsStrategy then correct UpdateOneModel")
void testUpdateOneTimestampsStrategyWithValidSinkDocument() {
  BsonDocument valueDoc =
      BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper', active: false}");

  WriteModel<BsonDocument> result =
      UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
  assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");

  UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;

  // NOTE: This test case can only check:
  // i) for both fields to be available
  // ii) having the correct BSON type (BsonDateTime)
  // iii) and be initially equal
  // The exact dateTime value is not directly testable here.
  BsonDocument updateDoc = (BsonDocument) writeModel.getUpdate();

  BsonDateTime modifiedTS =
      updateDoc
          .getDocument("$set")
          .getDateTime(UpdateOneTimestampsStrategy.FIELD_NAME_MODIFIED_TS);
  BsonDateTime insertedTS =
      updateDoc
          .getDocument("$setOnInsert")
          .getDateTime(UpdateOneTimestampsStrategy.FIELD_NAME_INSERTED_TS);

  assertEquals(
      insertedTS, modifiedTS, "modified and inserted timestamps must initially be equal");
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
  assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
}
 
源代码12 项目: mongo-kafka   文件: WriteModelStrategyTest.java
@Test
@DisplayName(
    "when sink document is valid for UpdateOneBusinessKeyTimestampStrategy then correct UpdateOneModel")
void testUpdateOneBusinessKeyTimestampsStrategyWithValidSinkDocument() {
  BsonDocument valueDoc =
      BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper', active: false}");

  WriteModel<BsonDocument> result =
      UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
  assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");

  UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;

  // NOTE: This test case can only check:
  // i) for both fields to be available
  // ii) having the correct BSON type (BsonDateTime)
  // iii) and be initially equal
  // The exact dateTime value is not directly testable here.
  BsonDocument updateDoc = (BsonDocument) writeModel.getUpdate();

  BsonDateTime modifiedTS =
      updateDoc
          .getDocument("$set")
          .getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_MODIFIED_TS);
  BsonDateTime insertedTS =
      updateDoc
          .getDocument("$setOnInsert")
          .getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_INSERTED_TS);

  assertEquals(
      insertedTS, modifiedTS, "modified and inserted timestamps must initially be equal");
  assertTrue(
      writeModel.getFilter() instanceof BsonDocument,
      "filter expected to be of type BsonDocument");
  assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
  assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
}
 
源代码13 项目: ditto   文件: ThingDeleteModel.java
@Override
public WriteModel<Document> toMongo() {
    final Bson filter = getFilter();
    final Bson update = new BsonDocument().append(SET,
            new BsonDocument().append(FIELD_DELETE_AT, new BsonDateTime(0L)));
    final UpdateOptions updateOptions = new UpdateOptions().bypassDocumentValidation(true);
    return new UpdateOneModel<>(filter, update, updateOptions);
}
 
源代码14 项目: kafka-connect-mongodb   文件: MongodbSinkTask.java
/**
 * Put the records in the sink.
 *
 * @param collection the set of records to send.
 */
@Override
public void put(Collection<SinkRecord> collection) {
    List<SinkRecord> records = new ArrayList<>(collection);
    for (int i = 0; i < records.size(); i++) {
        Map<String, List<WriteModel<Document>>> bulks = new HashMap<>();

        for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
            SinkRecord record = records.get(i);
            Map<String, Object> jsonMap = SchemaUtils.toJsonMap((Struct) record.value());
            String topic = record.topic();

            if (bulks.get(topic) == null) {
                bulks.put(topic, new ArrayList<WriteModel<Document>>());
            }

            Document newDocument = new Document(jsonMap)
                    .append("_id", record.kafkaOffset());

            log.trace("Adding to bulk: {}", newDocument.toString());
            bulks.get(topic).add(new UpdateOneModel<Document>(
                    Filters.eq("_id", record.kafkaOffset()),
                    new Document("$set", newDocument),
                    new UpdateOptions().upsert(true)));
        }
        i--;
        log.trace("Executing bulk");
        for (String key : bulks.keySet()) {
            try {
                com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key));
            } catch (Exception e) {
                log.error(e.getMessage());
            }
        }
    }
}
 
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document, SinkRecord record) {

    BsonDocument vd = document.getValueDoc().orElseThrow(
            () -> new DataException("error: cannot build the WriteModel since"
                    + " the value document was missing unexpectedly")
    );

    //1) add kafka coordinates to the value document
    //NOTE: future versions might allow to configure the fieldnames
    //via external configuration properties, for now this is pre-defined.
    vd.append(FIELD_KAFKA_COORDS, new BsonDocument(
            FIELD_TOPIC, new BsonString(record.topic()))
            .append(FIELD_PARTITION, new BsonInt32(record.kafkaPartition()))
            .append(FIELD_OFFSET, new BsonInt64(record.kafkaOffset()))
    );

    //2) build the conditional update pipeline based on Kafka coordinates
    //which makes sure that in case records get replayed - e.g. either due to
    //uncommitted offsets or newly started connectors with different names -
    //that stale data never overwrites newer data which was previously written
    //to the sink already.
    List<Bson> conditionalUpdatePipeline = new ArrayList<>();
    conditionalUpdatePipeline.add(new BsonDocument("$replaceRoot",
            new BsonDocument("newRoot", new BsonDocument("$cond",
                    new BsonDocument("if", new BsonDocument("$and",
                            new BsonArray(Arrays.asList(
                                    new BsonDocument("$eq", new BsonArray(Arrays.asList(
                                            new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_TOPIC),
                                            new BsonString(record.topic())))),
                                    new BsonDocument("$eq", new BsonArray(Arrays.asList(
                                            new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_PARTITION),
                                            new BsonInt32(record.kafkaPartition())))),
                                    new BsonDocument("$gte", new BsonArray(Arrays.asList(
                                            new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_OFFSET),
                                            new BsonInt64(record.kafkaOffset()))))
                            ))))
                            .append("then", new BsonString("$$ROOT"))
                            .append("else", vd)
            ))
    ));

    return new UpdateOneModel<>(
            new BsonDocument(DBCollection.ID_FIELD_NAME, vd.get(DBCollection.ID_FIELD_NAME)),
            conditionalUpdatePipeline,
            UPDATE_OPTIONS
    );
}
 
@Test
@DisplayName("when sink document is valid for UpdateOneTimestampsStrategy then correct UpdateOneModel")
public void  testUpdateOneTimestampsStrategyWithValidSinkDocument() {

    BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonInt32(1004))
            .append("first_name",new BsonString("Anne"))
            .append("last_name",new BsonString("Kretchmar"))
            .append("email",new BsonString("[email protected]"));

    WriteModel<BsonDocument> result =
            UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc));

    assertTrue(result instanceof UpdateOneModel,
            () -> "result expected to be of type UpdateOneModel");

    UpdateOneModel<BsonDocument> writeModel =
            (UpdateOneModel<BsonDocument>) result;

    //NOTE: This test case can only check:
    // i) for both fields to be available
    // ii) having the correct BSON type (BsonDateTime)
    // iii) and be initially equal
    // The exact dateTime value is not directly testable here.
    BsonDocument updateDoc = (BsonDocument)writeModel.getUpdate();

    BsonDateTime modifiedTS = updateDoc.getDocument("$set")
                                .getDateTime(UpdateOneTimestampsStrategy.FIELDNAME_MODIFIED_TS);
    BsonDateTime insertedTS = updateDoc.getDocument("$setOnInsert")
            .getDateTime(UpdateOneTimestampsStrategy.FIELDNAME_INSERTED_TS);

    assertTrue(insertedTS.equals(modifiedTS),
            () -> "modified and inserted timestamps must initially be equal");

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS,writeModel.getFilter());

    assertTrue(writeModel.getOptions().isUpsert(),
            () -> "update expected to be done in upsert mode");

}
 
@Test
@DisplayName("when sink document and sink record is valid for MonotonicWritesDefaultStrategy then correct UpdateOneModel")
public void testMonotonicWritesDefaultStrategyWithValidSinkDocumentAndSinkRecord() {

    BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonInt32(1004))
            .append("first_name",new BsonString("Anne"))
            .append("last_name",new BsonString("Kretchmar"))
            .append("email",new BsonString("[email protected]"));

    SinkRecord sinkRecord = new SinkRecord("some-topic",1,null,null,null,null,111);

    WriteModel<BsonDocument> result =
            MONOTONIC_WRITES_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc),sinkRecord);

    assertTrue(result instanceof UpdateOneModel,
            () -> "result expected to be of type UpdateOneModel");

    UpdateOneModel<BsonDocument> writeModel =
            (UpdateOneModel<BsonDocument>) result;

    assertTrue(writeModel.getFilter() instanceof BsonDocument,
            () -> "filter expected to be of type BsonDocument");

    assertEquals(FILTER_DOC_MONOTONIC_WRITES_DEFAULT,writeModel.getFilter());

    List<? extends Bson> updatePipeline = writeModel.getUpdatePipeline();

    assertNotNull(updatePipeline,() -> "update pipeline must not be null");
    assertEquals(1, updatePipeline.size(),
            () -> "update pipeline expected to contain exactly 1 element only");

    BsonDocument first = (BsonDocument)updatePipeline.get(0);

    BsonDocument condDoc = first.get("$replaceRoot").asDocument()
            .get("newRoot").asDocument()
            .get("$cond").asDocument();

    String condIfP1Name = condDoc.get("if").asDocument().get("$and").asArray()
            .get(0).asDocument().get("$eq").asArray()
            .get(0).asString().getValue();

    String condIfP1Value = condDoc.get("if").asDocument().get("$and").asArray()
            .get(0).asDocument().get("$eq").asArray()
            .get(1).asString().getValue();

    assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
            + "." + MonotonicWritesDefaultStrategy.FIELD_TOPIC,condIfP1Name);
    assertEquals(sinkRecord.topic(),condIfP1Value);

    String condIfP2Name =  condDoc.get("if").asDocument().get("$and").asArray()
            .get(1).asDocument().get("$eq").asArray()
            .get(0).asString().getValue();

    Integer condIfP2Value = condDoc.get("if").asDocument().get("$and").asArray()
            .get(1).asDocument().get("$eq").asArray()
            .get(1).asInt32().getValue();

    assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
            + "." + MonotonicWritesDefaultStrategy.FIELD_PARTITION,condIfP2Name);
    assertEquals(sinkRecord.kafkaPartition(),condIfP2Value);

    String condIfP3Name =  condDoc.get("if").asDocument().get("$and").asArray()
            .get(2).asDocument().get("$gte").asArray()
            .get(0).asString().getValue();

    Long condIfP3Value = condDoc.get("if").asDocument().get("$and").asArray()
            .get(2).asDocument().get("$gte").asArray()
            .get(1).asInt64().getValue();

    assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
            + "." + MonotonicWritesDefaultStrategy.FIELD_OFFSET,condIfP3Name);
    assertEquals(sinkRecord.kafkaOffset(),condIfP3Value);

    String condThen = condDoc.get("then").asString().getValue();

    assertEquals("$$ROOT",condThen,
            () -> "conditional then branch must be $$ROOT");

    BsonDocument condElseDoc = condDoc.get("else").asDocument();

    assertEquals(MONOTONIC_WRITES_DOC_DEFAULT,condElseDoc,
            ()-> "conditional else branch contains wrong update document");

    assertTrue(writeModel.getOptions().isUpsert(),
            () -> "replacement expected to be done in upsert mode");

}
 
源代码18 项目: stitch-android-sdk   文件: DataSynchronizer.java
/**
 * Updates a single synchronized document by its given id with the given update description.
 *
 * @param nsConfig  the namespace synchronization config of the namespace where the document
 *                  lives.
 * @param documentId the _id of the document.
 * @param updateDescription the update description to apply locally.
 * @param atVersion the Stitch sync version that should be written locally for this update.
 * @param withHash the FNV-1a hash of the sanitized document.
 */
@CheckReturnValue
private LocalSyncWriteModelContainer updateOneFromRemote(
    final NamespaceSynchronizationConfig nsConfig,
    final BsonValue documentId,
    final UpdateDescription updateDescription,
    final BsonDocument atVersion,
    final long withHash
) {
  if (updateDescription.isEmpty()) {
    // don't do anything for completely no-op updates
    return null;
  }

  final UpdateDescription sanitizedUpdateDescription =
      sanitizeUpdateDescription(updateDescription);

  final MongoNamespace namespace = nsConfig.getNamespace();
  final ChangeEvent<BsonDocument> event;
  final Lock lock =
      this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
  lock.lock();
  final CoreDocumentSynchronizationConfig config;
  try {
    config = syncConfig.getSynchronizedDocument(namespace, documentId);
    if (config == null) {
      return null;
    }

    config.setPendingWritesComplete(withHash, atVersion);
  } finally {
    lock.unlock();
  }

  final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);

  // only emit the event and execute the local write if the
  // sanitized update description is non-empty
  if (!sanitizedUpdateDescription.isEmpty()) {
    container.addDocIDs(documentId);

    event = ChangeEvents.changeEventForLocalUpdate(
        namespace,
        documentId,
        sanitizedUpdateDescription,
        null,
        false
    );
    container.addLocalChangeEvent(event);

    // we should not upsert. if the document does not exist,
    // it means we are out of date on that document. we can
    // not apply an update change event as an upsert
    container.addLocalWrite(new UpdateOneModel<>(
        getDocumentIdFilter(documentId),
        sanitizedUpdateDescription.toUpdateDocument(),
        new UpdateOptions().upsert(false)
    ));
  }

  container.addConfigWrite(new ReplaceOneModel<>(
      CoreDocumentSynchronizationConfig.getDocFilter(namespace, config.getDocumentId()),
      config
  ));

  return container;
}
 
源代码19 项目: aion   文件: MongoDB.java
/**
 * Gets the number of updates (edits or inserts) in this batch
 *
 * @return Number of updates
 */
public long getUpdateCount() {
    return this.edits.stream().filter(e -> e instanceof UpdateOneModel).count();
}
 
 类所在包
 同包方法