下面列出了怎么用com.mongodb.client.model.UpdateOneModel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument vd =
document
.getValueDoc()
.orElseThrow(
() ->
new DataException(
"Error: cannot build the WriteModel since the value document was missing unexpectedly"));
BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());
return new UpdateOneModel<>(
new BsonDocument(ID_FIELD, vd.get(ID_FIELD)),
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS);
}
@Test
@DisplayName("when valid doc change cdc event then correct UpdateOneModel")
void testValidSinkDocumentForUpdate() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC.toJson()));
WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
assertEquals(UPDATE_DOC, writeModel.getUpdate(), "update doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
}
@Test
@DisplayName(
"when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));
WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(
result instanceof UpdateOneModel, () -> "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
assertEquals(
UPDATE_DOC, writeModel.getUpdate(), () -> "update doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
}
/**
* Adds a new edit to the batch
*
* @param key the key to write
* @param value the value to write. Null indicates we should delete this key
* @return this
*/
public WriteBatch addEdit(byte[] key, byte[] value) {
if (value == null) {
DeleteOneModel deleteModel =
new DeleteOneModel<>(eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)));
edits.add(deleteModel);
} else {
UpdateOneModel updateModel =
new UpdateOneModel<>(
eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)),
Updates.set(MongoConstants.VALUE_FIELD_NAME, new BsonBinary(value)),
new UpdateOptions().upsert(true));
edits.add(updateModel);
}
return this;
}
@Test
void bulkWrite() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))))
.await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());
return new UpdateOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
vd.get(DBCollection.ID_FIELD_NAME)),
new BsonDocument("$set", vd.append(FIELDNAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELDNAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS
);
}
@Test
@DisplayName("when valid doc change cdc event then correct UpdateOneModel")
public void testValidSinkDocumentForUpdate() {
BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
.append("patch",new BsonString(UPDATE_DOC.toJson()));
WriteModel<BsonDocument> result =
MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof UpdateOneModel,
() -> "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) result;
assertEquals(UPDATE_DOC,writeModel.getUpdate(),
()-> "update doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC,writeModel.getFilter());
}
@Test
@DisplayName("when valid doc change cdc event containing internal oplog fields then correct UpdateOneModel")
public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
.append("patch",new BsonString(UPDATE_DOC_WITH_OPLOG_INTERNALS.toJson()));
WriteModel<BsonDocument> result =
MONGODB_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof UpdateOneModel,
() -> "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) result;
assertEquals(UPDATE_DOC,writeModel.getUpdate(),
()-> "update doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC,writeModel.getFilter());
}
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument vd =
document
.getValueDoc()
.orElseThrow(
() ->
new DataException(
"Error: cannot build the WriteModel since the value document was missing unexpectedly"));
BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());
try {
BsonDocument businessKey = vd.getDocument(ID_FIELD);
vd.remove(ID_FIELD);
return new UpdateOneModel<>(
businessKey,
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS);
} catch (BSONException e) {
throw new DataException(
"Error: cannot build the WriteModel since the value document does not contain an _id field of"
+ " type BsonDocument which holds the business key fields");
}
}
@Test
@DisplayName(
"when sink document is valid for UpdateOneTimestampsStrategy then correct UpdateOneModel")
void testUpdateOneTimestampsStrategyWithValidSinkDocument() {
BsonDocument valueDoc =
BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper', active: false}");
WriteModel<BsonDocument> result =
UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
// NOTE: This test case can only check:
// i) for both fields to be available
// ii) having the correct BSON type (BsonDateTime)
// iii) and be initially equal
// The exact dateTime value is not directly testable here.
BsonDocument updateDoc = (BsonDocument) writeModel.getUpdate();
BsonDateTime modifiedTS =
updateDoc
.getDocument("$set")
.getDateTime(UpdateOneTimestampsStrategy.FIELD_NAME_MODIFIED_TS);
BsonDateTime insertedTS =
updateDoc
.getDocument("$setOnInsert")
.getDateTime(UpdateOneTimestampsStrategy.FIELD_NAME_INSERTED_TS);
assertEquals(
insertedTS, modifiedTS, "modified and inserted timestamps must initially be equal");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
}
@Test
@DisplayName(
"when sink document is valid for UpdateOneBusinessKeyTimestampStrategy then correct UpdateOneModel")
void testUpdateOneBusinessKeyTimestampsStrategyWithValidSinkDocument() {
BsonDocument valueDoc =
BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper', active: false}");
WriteModel<BsonDocument> result =
UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
// NOTE: This test case can only check:
// i) for both fields to be available
// ii) having the correct BSON type (BsonDateTime)
// iii) and be initially equal
// The exact dateTime value is not directly testable here.
BsonDocument updateDoc = (BsonDocument) writeModel.getUpdate();
BsonDateTime modifiedTS =
updateDoc
.getDocument("$set")
.getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_MODIFIED_TS);
BsonDateTime insertedTS =
updateDoc
.getDocument("$setOnInsert")
.getDateTime(UpdateOneBusinessKeyTimestampStrategy.FIELD_NAME_INSERTED_TS);
assertEquals(
insertedTS, modifiedTS, "modified and inserted timestamps must initially be equal");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS, writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
}
@Override
public WriteModel<Document> toMongo() {
final Bson filter = getFilter();
final Bson update = new BsonDocument().append(SET,
new BsonDocument().append(FIELD_DELETE_AT, new BsonDateTime(0L)));
final UpdateOptions updateOptions = new UpdateOptions().bypassDocumentValidation(true);
return new UpdateOneModel<>(filter, update, updateOptions);
}
/**
* Put the records in the sink.
*
* @param collection the set of records to send.
*/
@Override
public void put(Collection<SinkRecord> collection) {
List<SinkRecord> records = new ArrayList<>(collection);
for (int i = 0; i < records.size(); i++) {
Map<String, List<WriteModel<Document>>> bulks = new HashMap<>();
for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
SinkRecord record = records.get(i);
Map<String, Object> jsonMap = SchemaUtils.toJsonMap((Struct) record.value());
String topic = record.topic();
if (bulks.get(topic) == null) {
bulks.put(topic, new ArrayList<WriteModel<Document>>());
}
Document newDocument = new Document(jsonMap)
.append("_id", record.kafkaOffset());
log.trace("Adding to bulk: {}", newDocument.toString());
bulks.get(topic).add(new UpdateOneModel<Document>(
Filters.eq("_id", record.kafkaOffset()),
new Document("$set", newDocument),
new UpdateOptions().upsert(true)));
}
i--;
log.trace("Executing bulk");
for (String key : bulks.keySet()) {
try {
com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document, SinkRecord record) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
//1) add kafka coordinates to the value document
//NOTE: future versions might allow to configure the fieldnames
//via external configuration properties, for now this is pre-defined.
vd.append(FIELD_KAFKA_COORDS, new BsonDocument(
FIELD_TOPIC, new BsonString(record.topic()))
.append(FIELD_PARTITION, new BsonInt32(record.kafkaPartition()))
.append(FIELD_OFFSET, new BsonInt64(record.kafkaOffset()))
);
//2) build the conditional update pipeline based on Kafka coordinates
//which makes sure that in case records get replayed - e.g. either due to
//uncommitted offsets or newly started connectors with different names -
//that stale data never overwrites newer data which was previously written
//to the sink already.
List<Bson> conditionalUpdatePipeline = new ArrayList<>();
conditionalUpdatePipeline.add(new BsonDocument("$replaceRoot",
new BsonDocument("newRoot", new BsonDocument("$cond",
new BsonDocument("if", new BsonDocument("$and",
new BsonArray(Arrays.asList(
new BsonDocument("$eq", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_TOPIC),
new BsonString(record.topic())))),
new BsonDocument("$eq", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_PARTITION),
new BsonInt32(record.kafkaPartition())))),
new BsonDocument("$gte", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_OFFSET),
new BsonInt64(record.kafkaOffset()))))
))))
.append("then", new BsonString("$$ROOT"))
.append("else", vd)
))
));
return new UpdateOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME, vd.get(DBCollection.ID_FIELD_NAME)),
conditionalUpdatePipeline,
UPDATE_OPTIONS
);
}
@Test
@DisplayName("when sink document is valid for UpdateOneTimestampsStrategy then correct UpdateOneModel")
public void testUpdateOneTimestampsStrategyWithValidSinkDocument() {
BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonInt32(1004))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"));
WriteModel<BsonDocument> result =
UPDATE_ONE_TIMESTAMPS_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc));
assertTrue(result instanceof UpdateOneModel,
() -> "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) result;
//NOTE: This test case can only check:
// i) for both fields to be available
// ii) having the correct BSON type (BsonDateTime)
// iii) and be initially equal
// The exact dateTime value is not directly testable here.
BsonDocument updateDoc = (BsonDocument)writeModel.getUpdate();
BsonDateTime modifiedTS = updateDoc.getDocument("$set")
.getDateTime(UpdateOneTimestampsStrategy.FIELDNAME_MODIFIED_TS);
BsonDateTime insertedTS = updateDoc.getDocument("$setOnInsert")
.getDateTime(UpdateOneTimestampsStrategy.FIELDNAME_INSERTED_TS);
assertTrue(insertedTS.equals(modifiedTS),
() -> "modified and inserted timestamps must initially be equal");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_UPDATE_TIMESTAMPS,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "update expected to be done in upsert mode");
}
@Test
@DisplayName("when sink document and sink record is valid for MonotonicWritesDefaultStrategy then correct UpdateOneModel")
public void testMonotonicWritesDefaultStrategyWithValidSinkDocumentAndSinkRecord() {
BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonInt32(1004))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"));
SinkRecord sinkRecord = new SinkRecord("some-topic",1,null,null,null,null,111);
WriteModel<BsonDocument> result =
MONOTONIC_WRITES_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc),sinkRecord);
assertTrue(result instanceof UpdateOneModel,
() -> "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_MONOTONIC_WRITES_DEFAULT,writeModel.getFilter());
List<? extends Bson> updatePipeline = writeModel.getUpdatePipeline();
assertNotNull(updatePipeline,() -> "update pipeline must not be null");
assertEquals(1, updatePipeline.size(),
() -> "update pipeline expected to contain exactly 1 element only");
BsonDocument first = (BsonDocument)updatePipeline.get(0);
BsonDocument condDoc = first.get("$replaceRoot").asDocument()
.get("newRoot").asDocument()
.get("$cond").asDocument();
String condIfP1Name = condDoc.get("if").asDocument().get("$and").asArray()
.get(0).asDocument().get("$eq").asArray()
.get(0).asString().getValue();
String condIfP1Value = condDoc.get("if").asDocument().get("$and").asArray()
.get(0).asDocument().get("$eq").asArray()
.get(1).asString().getValue();
assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
+ "." + MonotonicWritesDefaultStrategy.FIELD_TOPIC,condIfP1Name);
assertEquals(sinkRecord.topic(),condIfP1Value);
String condIfP2Name = condDoc.get("if").asDocument().get("$and").asArray()
.get(1).asDocument().get("$eq").asArray()
.get(0).asString().getValue();
Integer condIfP2Value = condDoc.get("if").asDocument().get("$and").asArray()
.get(1).asDocument().get("$eq").asArray()
.get(1).asInt32().getValue();
assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
+ "." + MonotonicWritesDefaultStrategy.FIELD_PARTITION,condIfP2Name);
assertEquals(sinkRecord.kafkaPartition(),condIfP2Value);
String condIfP3Name = condDoc.get("if").asDocument().get("$and").asArray()
.get(2).asDocument().get("$gte").asArray()
.get(0).asString().getValue();
Long condIfP3Value = condDoc.get("if").asDocument().get("$and").asArray()
.get(2).asDocument().get("$gte").asArray()
.get(1).asInt64().getValue();
assertEquals("$$ROOT."+ MonotonicWritesDefaultStrategy.FIELD_KAFKA_COORDS
+ "." + MonotonicWritesDefaultStrategy.FIELD_OFFSET,condIfP3Name);
assertEquals(sinkRecord.kafkaOffset(),condIfP3Value);
String condThen = condDoc.get("then").asString().getValue();
assertEquals("$$ROOT",condThen,
() -> "conditional then branch must be $$ROOT");
BsonDocument condElseDoc = condDoc.get("else").asDocument();
assertEquals(MONOTONIC_WRITES_DOC_DEFAULT,condElseDoc,
()-> "conditional else branch contains wrong update document");
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
/**
* Updates a single synchronized document by its given id with the given update description.
*
* @param nsConfig the namespace synchronization config of the namespace where the document
* lives.
* @param documentId the _id of the document.
* @param updateDescription the update description to apply locally.
* @param atVersion the Stitch sync version that should be written locally for this update.
* @param withHash the FNV-1a hash of the sanitized document.
*/
@CheckReturnValue
private LocalSyncWriteModelContainer updateOneFromRemote(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId,
final UpdateDescription updateDescription,
final BsonDocument atVersion,
final long withHash
) {
if (updateDescription.isEmpty()) {
// don't do anything for completely no-op updates
return null;
}
final UpdateDescription sanitizedUpdateDescription =
sanitizeUpdateDescription(updateDescription);
final MongoNamespace namespace = nsConfig.getNamespace();
final ChangeEvent<BsonDocument> event;
final Lock lock =
this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final CoreDocumentSynchronizationConfig config;
try {
config = syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
config.setPendingWritesComplete(withHash, atVersion);
} finally {
lock.unlock();
}
final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);
// only emit the event and execute the local write if the
// sanitized update description is non-empty
if (!sanitizedUpdateDescription.isEmpty()) {
container.addDocIDs(documentId);
event = ChangeEvents.changeEventForLocalUpdate(
namespace,
documentId,
sanitizedUpdateDescription,
null,
false
);
container.addLocalChangeEvent(event);
// we should not upsert. if the document does not exist,
// it means we are out of date on that document. we can
// not apply an update change event as an upsert
container.addLocalWrite(new UpdateOneModel<>(
getDocumentIdFilter(documentId),
sanitizedUpdateDescription.toUpdateDocument(),
new UpdateOptions().upsert(false)
));
}
container.addConfigWrite(new ReplaceOneModel<>(
CoreDocumentSynchronizationConfig.getDocFilter(namespace, config.getDocumentId()),
config
));
return container;
}
/**
* Gets the number of updates (edits or inserts) in this batch
*
* @return Number of updates
*/
public long getUpdateCount() {
return this.edits.stream().filter(e -> e instanceof UpdateOneModel).count();
}