下面列出了怎么用com.mongodb.client.model.ReplaceOneModel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
BsonValue businessKey = vd.get(DBCollection.ID_FIELD_NAME);
if(businessKey == null || !(businessKey instanceof BsonDocument)) {
throw new DataException("error: cannot build the WriteModel since"
+ " the value document does not contain an _id field of type BsonDocument"
+ " which holds the business key fields");
}
vd.remove(DBCollection.ID_FIELD_NAME);
return new ReplaceOneModel<>((BsonDocument)businessKey, vd, UPDATE_OPTIONS);
}
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key doc must not be missing for update operation"));
BsonDocument valueDoc =
doc.getValueDoc()
.orElseThrow(
() ->
new DataException("Error: value doc must not be missing for update operation"));
try {
BsonDocument filterDoc =
RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.UPDATE);
BsonDocument replaceDoc =
RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
return new ReplaceOneModel<>(filterDoc, replaceDoc, REPLACE_OPTIONS);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key doc must not be missing for insert operation"));
BsonDocument valueDoc =
doc.getValueDoc()
.orElseThrow(
() ->
new DataException("Error: value doc must not be missing for insert operation"));
try {
BsonDocument filterDoc =
RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.CREATE);
BsonDocument upsertDoc = RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
return new ReplaceOneModel<>(filterDoc, upsertDoc, REPLACE_OPTIONS);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument valueDoc =
doc.getValueDoc()
.orElseThrow(
() ->
new DataException("Error: value doc must not be missing for insert operation"));
try {
BsonDocument insertDoc =
BsonDocument.parse(valueDoc.get(JSON_DOC_FIELD_PATH).asString().getValue());
return new ReplaceOneModel<>(
new BsonDocument(ID_FIELD, insertDoc.get(ID_FIELD)), insertDoc, REPLACE_OPTIONS);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument vd =
document
.getValueDoc()
.orElseThrow(
() ->
new DataException(
"Error: cannot build the WriteModel since the value document was missing unexpectedly"));
try {
BsonDocument businessKey = vd.getDocument(ID_FIELD);
vd.remove(ID_FIELD);
return new ReplaceOneModel<>(businessKey, vd, REPLACE_OPTIONS);
} catch (BSONException e) {
throw new DataException(
"Error: cannot build the WriteModel since the value document does not contain an _id field of"
+ " type BsonDocument which holds the business key fields");
}
}
@Test
@DisplayName("when valid cdc event with compound PK then correct ReplaceOneModel")
void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc = BsonDocument.parse("{_id: {idA: 123, idB: 'ABC'}}");
BsonDocument replacementDoc = BsonDocument.parse("{_id: {idA: 123, idB: 'ABC'}, active: true}");
BsonDocument keyDoc = BsonDocument.parse("{idA: 123, idB: 'ABC'}");
BsonDocument valueDoc =
BsonDocument.parse("{op: 'c', after: {_id: {idA: 123, idB: 'ABC'}, active: true}}");
WriteModel<BsonDocument> result = RDBMS_INSERT.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
replacementDoc,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(filterDoc, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when valid cdc event then correct ReplaceOneModel")
void testValidSinkDocument() {
BsonDocument keyDoc = new BsonDocument("id", new BsonString("1234"));
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("c"))
.append("after", new BsonString(REPLACEMENT_DOC.toJson()));
WriteModel<BsonDocument> result = INSERT.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
REPLACEMENT_DOC,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Test
@DisplayName(
"when sink document is valid for ReplaceOneDefaultStrategy then correct ReplaceOneModel")
void testReplaceOneDefaultStrategyWithValidSinkDocument() {
BsonDocument valueDoc =
BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper'}");
WriteModel<BsonDocument> result =
REPLACE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
REPLACEMENT_DOC_DEFAULT,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_REPLACE_DEFAULT, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Test
void bulkWrite() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))))
.await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
private static Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
return collection.bulkWrite(bulk).onItem().ignore().andContinueWithNull();
}
private static void persistOrUpdate(MongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
collection.bulkWrite(bulk);
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
() -> new DataException("error: key doc must not be missing for update operation")
);
BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
() -> new DataException("error: value doc must not be missing for update operation")
);
try {
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.UPDATE);
BsonDocument replaceDoc = RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
return new ReplaceOneModel<>(filterDoc, replaceDoc, UPDATE_OPTIONS);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
() -> new DataException("error: key doc must not be missing for insert operation")
);
BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
() -> new DataException("error: value doc must not be missing for insert operation")
);
try {
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.CREATE);
BsonDocument upsertDoc = RdbmsHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc);
return new ReplaceOneModel<>(filterDoc, upsertDoc, UPDATE_OPTIONS);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
() -> new DataException("error: value doc must not be missing for insert operation")
);
try {
BsonDocument insertDoc = BsonDocument.parse(
valueDoc.get(JSON_DOC_FIELD_PATH).asString().getValue()
);
return new ReplaceOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
insertDoc.get(DBCollection.ID_FIELD_NAME)),
insertDoc,
UPDATE_OPTIONS
);
} catch(Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument vd =
document
.getValueDoc()
.orElseThrow(
() ->
new DataException(
"Error: cannot build the WriteModel since the value document was missing unexpectedly"));
return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, vd.get(ID_FIELD)), vd, REPLACE_OPTIONS);
}
@Test
@DisplayName("when valid cdc event with single field PK then correct ReplaceOneModel")
void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc = BsonDocument.parse("{_id: {id: 1234}}");
BsonDocument replacementDoc =
BsonDocument.parse("{_id: {id: 1234}, first_name: 'Grace', last_name: 'Hopper'}");
BsonDocument keyDoc = BsonDocument.parse("{id: 1234}");
BsonDocument valueDoc =
BsonDocument.parse(
"{op: 'c', after: {id: 1234, first_name: 'Grace', last_name: 'Hopper'}}");
WriteModel<BsonDocument> result = RDBMS_INSERT.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
replacementDoc,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(filterDoc, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
private void verifyResultsNoPK(final BsonDocument valueDoc) {
// NOTE: for both filterDoc and replacementDoc _id have a generated ObjectId fetched from the
// WriteModel
BsonDocument filterDoc = new BsonDocument();
BsonDocument keyDoc = new BsonDocument();
BsonDocument replacementDoc = valueDoc.getDocument("after").clone();
WriteModel<BsonDocument> result = RDBMS_INSERT.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertTrue(
writeModel.getReplacement().isObjectId("_id"),
"replacement doc must contain _id field of type ObjectID");
replacementDoc.put("_id", writeModel.getReplacement().getObjectId("_id"));
assertEquals(
replacementDoc,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertTrue(
((BsonDocument) writeModel.getFilter()).isObjectId("_id"),
"filter doc must contain _id field of type ObjectID");
filterDoc.put("_id", ((BsonDocument) writeModel.getFilter()).getObjectId("_id"));
assertEquals(filterDoc, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when valid doc replace cdc event then correct ReplaceOneModel")
void testValidSinkDocumentForReplacement() {
BsonDocument keyDoc = BsonDocument.parse("{id: 1234}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(REPLACEMENT_DOC.toJson()));
WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
REPLACEMENT_DOC,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Test
@DisplayName(
"when sink document is valid for ReplaceOneBusinessKeyStrategy then correct ReplaceOneModel")
void testReplaceOneBusinessKeyStrategyWithValidSinkDocument() {
BsonDocument valueDoc =
BsonDocument.parse(
"{_id: {first_name: 'Grace', last_name: 'Hopper'}, "
+ "first_name: 'Grace', last_name: 'Hopper', active: false}}");
WriteModel<BsonDocument> result =
REPLACE_ONE_BUSINESS_KEY_STRATEGY.createWriteModel(new SinkDocument(null, valueDoc));
assertTrue(result instanceof ReplaceOneModel, "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel = (ReplaceOneModel<BsonDocument>) result;
assertEquals(
REPLACEMENT_DOC_BUSINESS_KEY,
writeModel.getReplacement(),
"replacement doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_REPLACE_BUSINESS_KEY, writeModel.getFilter());
assertTrue(
writeModel.getReplaceOptions().isUpsert(),
"replacement expected to be done in upsert mode");
}
@Override
public boolean upsertBar(String dbName, String collectionName, List<BarField> barList) {
if (barList == null || barList.isEmpty()) {
logger.error("更新插入Bar集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (BarField bar : barList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", bar.getUnifiedSymbol());
filterDocument.put("actionTimestamp", bar.getActionTimestamp());
Document barDocument = barToDocument(bar);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, barDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Bar集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), barList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp", "unifiedSymbol"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Bar集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
@Override
public boolean upsertTick(String dbName, String collectionName, List<TickField> tickList) {
if (tickList == null || tickList.isEmpty()) {
logger.error("更新插入Tick集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (TickField tick : tickList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", tick.getUnifiedSymbol());
filterDocument.put("actionTimestamp", tick.getActionTimestamp());
Document tickDocument = tickToDocument(tick);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, tickDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Tick集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), tickList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Tick集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
public void Save(Document doc) {
if (!doc.containsKey("_id")) {
Create(doc);
return;
}
Document find = new Document("_id", doc.get("_id"));
UpdateOptions uo = new UpdateOptions();
uo.upsert(true);
ops.add(new ReplaceOneModel<Document>(find, doc, uo));
FlushOpsIfFull();
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
return new ReplaceOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
vd.get(DBCollection.ID_FIELD_NAME)),
vd,
UPDATE_OPTIONS);
}
@Test
@DisplayName("when sink document is valid for ReplaceOneDefaultStrategy then correct ReplaceOneModel")
public void testReplaceOneDefaultStrategyWithValidSinkDocument() {
BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonInt32(1004))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"));
WriteModel<BsonDocument> result =
REPLACE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(REPLACEMENT_DOC_DEFAULT,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_REPLACE_DEFAULT,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when sink document is valid for ReplaceOneBusinessKeyStrategy then correct ReplaceOneModel")
public void testReplaceOneBusinessKeyStrategyWithValidSinkDocument() {
BsonDocument valueDoc = new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar")))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"))
.append("age", new BsonInt32(23))
.append("active", new BsonBoolean(true));
WriteModel<BsonDocument> result =
REPLACE_ONE_BUSINESS_KEY_STRATEGY.createWriteModel(new SinkDocument(null,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(REPLACEMENT_DOC_BUSINESS_KEY,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_REPLACE_BUSINESS_KEY,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
/**
* Deletes a single synchronized document by its given id. No deletion will occur if the _id is
* not being synchronized.
*
* @param nsConfig the namespace synchronization config of the namespace where the document
* lives.
* @param documentId the _id of the document.
*/
@CheckReturnValue
private @Nullable
LocalSyncWriteModelContainer deleteOneFromResolution(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId,
final BsonDocument atVersion
) {
final MongoNamespace namespace = nsConfig.getNamespace();
final ChangeEvent<BsonDocument> event;
final Lock lock =
this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final CoreDocumentSynchronizationConfig config;
try {
config = syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
event = ChangeEvents.changeEventForLocalDelete(namespace, documentId, true);
config.setSomePendingWrites(logicalT, atVersion, 0L, event);
} finally {
lock.unlock();
}
final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);
container.addDocIDs(documentId);
container.addLocalWrite(new DeleteOneModel<>(getDocumentIdFilter(documentId)));
container.addLocalChangeEvent(event);
container.addConfigWrite(
new ReplaceOneModel<>(CoreDocumentSynchronizationConfig.getDocFilter(
namespace, config.getDocumentId()
), config));
return container;
}
/**
* Stores all documents in the stream. Documents that have an ID will replace existing
* documents with that ID, documents without ID will be inserted and MongoDB will assign it with
* a unique ID.
* @param topic topic to store the documents for
* @param docs documents to insert.
* @throws MongoException if a document could not be stored.
*/
public void store(String topic, Stream<Document> docs) throws MongoException {
getCollection(topic).bulkWrite(docs
.map(doc -> {
Object mongoId = doc.get(MONGO_ID_KEY);
if (mongoId != null) {
return new ReplaceOneModel<>(eq(MONGO_ID_KEY, mongoId), doc, UPDATE_UPSERT);
} else {
return new InsertOneModel<>(doc);
}
})
.collect(Collectors.toList()));
}
public void createOrUpdateBulk(final Collection<V> values, final long maxTime, final TimeUnit timeUnit) {
if (values.isEmpty()) {
return;
}
final List<ReplaceOneModel<Document>> bulkOperations = values.stream()
.map(value -> new ReplaceOneModel<>(
eq(ID, keyOf(value)),
encode(value),
BULK_UPSERT_OPERATION))
.collect(toList());
collectionWithWriteTimeout(maxTime, timeUnit)
.bulkWrite(bulkOperations, BULK_WRITE_OPTIONS);
}
/**
* Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
* with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
* object to BsonDocument to extract {@code _id} attribute.
*/
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
ReplaceOptions options = new ReplaceOptions();
if (operation.upsert()) {
options.upsert(operation.upsert());
}
List<ReplaceOneModel<Object>> docs = operation.values().stream()
.map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
.collect(Collectors.toList());
Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}