下面列出了怎么用com.mongodb.client.model.ReplaceOptions的API类实例代码及写法,或者点击链接到github查看源代码。
private static Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, Object entity) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
return collection.insertOne(entity).onItem().ignore().andContinueWithNull();
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
return collection.replaceOne(query, entity, new ReplaceOptions().upsert(true))
.onItem().ignore().andContinueWithNull();
}
}
private static Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
return collection.bulkWrite(bulk).onItem().ignore().andContinueWithNull();
}
private static void persistOrUpdate(MongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
collection.bulkWrite(bulk);
}
@Override
public void replace(T entity) {
var watch = new StopWatch();
Object id = null;
validator.validate(entity);
try {
id = mongo.codecs.id(entity);
if (id == null) throw new Error(format("entity must have id, entityClass={}", entityClass.getCanonicalName()));
collection().replaceOne(Filters.eq("_id", id), entity, new ReplaceOptions().upsert(true));
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("mongo", elapsed, 0, 1);
logger.debug("replace, collection={}, id={}, elapsed={}", collectionName, id, elapsed);
checkSlowOperation(elapsed);
}
}
protected <T> void saveDocument(final T entity, final MongoCollection<T> collection, final InsertOneOptions options) {
Object id = mapper.getMappedClass(entity.getClass()).getIdField().getFieldValue(entity);
ClientSession clientSession = findSession(options);
if (id == null) {
if (clientSession == null) {
options.prepare(collection).insertOne(entity, options.getOptions());
} else {
options.prepare(collection).insertOne(clientSession, entity, options.getOptions());
}
} else {
ReplaceOptions updateOptions = new ReplaceOptions()
.bypassDocumentValidation(options.getBypassDocumentValidation())
.upsert(true);
MongoCollection<T> updated = collection;
if (options.writeConcern() != null) {
updated = collection.withWriteConcern(options.writeConcern());
}
if (clientSession == null) {
updated.replaceOne(new Document("_id", id), entity, updateOptions);
} else {
updated.replaceOne(clientSession, new Document("_id", id), entity, updateOptions);
}
}
}
@Test
void replaceOne() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
CompletableFuture.allOf(
collection
.insertOne(new Document("id", 1).append("name", "superman").append("type", "heroes")
.append("stars", 5))
.subscribeAsCompletionStage(),
collection.insertOne(
new Document("id", 2).append("name", "batman").append("type", "heroes").append("stars", 4))
.subscribeAsCompletionStage(),
collection
.insertOne(new Document("id", 3).append("name", "frogman").append("type", "villain")
.append("stars", 1))
.subscribeAsCompletionStage(),
collection.insertOne(
new Document("id", 4).append("name", "joker").append("type", "villain").append("stars", 5))
.subscribeAsCompletionStage())
.join();
Document newVillain = new Document("id", 5).append("name", "lex lutor").append("type", "villain")
.append("stars", 3);
Document newHeroes = new Document("id", 6).append("name", "supergirl").append("type", "heroes")
.append("stars", 2);
UpdateResult result = collection
.replaceOne(new Document("id", 3), newVillain, new ReplaceOptions().bypassDocumentValidation(true))
.await().indefinitely();
UpdateResult result2 = collection.replaceOne(new Document("id", 2), newHeroes).await().indefinitely();
UpdateResult result3 = collection.replaceOne(new Document("id", 50), newHeroes).await().indefinitely();
assertThat(result.getMatchedCount()).isEqualTo(1);
assertThat(result.getModifiedCount()).isEqualTo(1);
assertThat(result2.getMatchedCount()).isEqualTo(1);
assertThat(result2.getModifiedCount()).isEqualTo(1);
assertThat(result3.getMatchedCount()).isEqualTo(0);
assertThat(result3.getModifiedCount()).isEqualTo(0);
}
private static void persistOrUpdate(MongoCollection collection, Object entity) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
collection.insertOne(entity);
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
collection.replaceOne(query, entity, new ReplaceOptions().upsert(true));
}
}
/**
* 更新或插入数据
*
* @param dbName
* @param collectionName
* @param document
* @param filter
* @return
*/
public boolean upsert(String dbName, String collectionName, Document document, Document filter) {
if (document != null) {
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
mongoClient.getDatabase(dbName).getCollection(collectionName).replaceOne(filter, document, replaceOptions);
return true;
}
return false;
}
@Override
public boolean upsertBar(String dbName, String collectionName, List<BarField> barList) {
if (barList == null || barList.isEmpty()) {
logger.error("更新插入Bar集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (BarField bar : barList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", bar.getUnifiedSymbol());
filterDocument.put("actionTimestamp", bar.getActionTimestamp());
Document barDocument = barToDocument(bar);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, barDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Bar集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), barList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp", "unifiedSymbol"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Bar集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
@Override
public boolean upsertTick(String dbName, String collectionName, List<TickField> tickList) {
if (tickList == null || tickList.isEmpty()) {
logger.error("更新插入Tick集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (TickField tick : tickList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", tick.getUnifiedSymbol());
filterDocument.put("actionTimestamp", tick.getActionTimestamp());
Document tickDocument = tickToDocument(tick);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, tickDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Tick集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), tickList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Tick集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
@Override
public Mono<Void> insert() {
LOGGER.debug("[LotteryHistoric] Insertion");
return Mono.from(DatabaseManager.getLottery()
.getCollection()
.replaceOne(Filters.eq("_id", "historic"),
this.toDocument(),
new ReplaceOptions().upsert(true)))
.doOnNext(result -> LOGGER.trace("[LotteryHistoric] Insertion result: {}", result))
.then()
.doOnTerminate(() -> DB_REQUEST_COUNTER.labels(LotteryCollection.NAME).inc());
}
@Override
public void saveUser(User user) {
user.getIoLock().lock();
try {
MongoCollection<Document> c = this.database.getCollection(this.prefix + "users");
if (!this.plugin.getUserManager().shouldSave(user)) {
c.deleteOne(new Document("_id", user.getUniqueId()));
} else {
c.replaceOne(new Document("_id", user.getUniqueId()), userToDoc(user), new ReplaceOptions().upsert(true));
}
} finally {
user.getIoLock().unlock();
}
}
@Override
public void saveGroup(Group group) {
group.getIoLock().lock();
try {
MongoCollection<Document> c = this.database.getCollection(this.prefix + "groups");
c.replaceOne(new Document("_id", group.getName()), groupToDoc(group), new ReplaceOptions().upsert(true));
} finally {
group.getIoLock().unlock();
}
}
@Override
public PlayerSaveResult savePlayerData(UUID uniqueId, String username) {
username = username.toLowerCase();
MongoCollection<Document> c = this.database.getCollection(this.prefix + "uuid");
// find any existing mapping
String oldUsername = getPlayerName(uniqueId);
// do the insert
if (!username.equalsIgnoreCase(oldUsername)) {
c.replaceOne(new Document("_id", uniqueId), new Document("_id", uniqueId).append("name", username), new ReplaceOptions().upsert(true));
}
PlayerSaveResultImpl result = PlayerSaveResultImpl.determineBaseResult(username, oldUsername);
Set<UUID> conflicting = new HashSet<>();
try (MongoCursor<Document> cursor = c.find(new Document("name", username)).iterator()) {
while (cursor.hasNext()) {
conflicting.add(getDocumentId(cursor.next()));
}
}
conflicting.remove(uniqueId);
if (!conflicting.isEmpty()) {
// remove the mappings for conflicting uuids
c.deleteMany(Filters.and(conflicting.stream().map(u -> Filters.eq("_id", u)).collect(Collectors.toList())));
result = result.withOtherUuidsPresent(conflicting);
}
return result;
}
@Override
public void update(final RyaDetails oldDetails, final RyaDetails newDetails)
throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException {
// Preconditions.
requireNonNull(oldDetails);
requireNonNull(newDetails);
if(!newDetails.getRyaInstanceName().equals( instanceName )) {
throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " +
"the instance name that this repository is connected to. Make sure you're connected to the" +
"correct Rya instance.");
}
if(!isInitialized()) {
throw new NotInitializedException("Could not update the details for the Rya instanced named '" +
instanceName + "' because it has not been initialized yet.");
}
if(oldDetails.equals(newDetails)) {
return;
}
final MongoCollection<Document> col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME);
final Document oldObj = MongoDetailsAdapter.toDocument(oldDetails);
final Document newObj = MongoDetailsAdapter.toDocument(newDetails);
final UpdateResult result = col.replaceOne(oldObj, newObj, new ReplaceOptions());
//since there is only 1 document, there should only be 1 update.
if(result.getModifiedCount() != 1) {
throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" +
instanceName + "' because the old value is out of date.");
}
}
@Override
public Publisher<UpdateResult> replaceOne(final Bson filter, final TDocument replacement, final ReplaceOptions options) {
return new ObservableToPublisher<UpdateResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<UpdateResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<UpdateResult> callback) {
wrapped.replaceOne(filter, replacement, options, callback);
}
}));
}
@Override
public Publisher<UpdateResult> replaceOne(final ClientSession clientSession, final Bson filter, final TDocument replacement,
final ReplaceOptions options) {
return new ObservableToPublisher<UpdateResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<UpdateResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<UpdateResult> callback) {
wrapped.replaceOne(clientSession.getWrapped(), filter, replacement, options, callback);
}
}));
}
/**
* Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
* with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
* object to BsonDocument to extract {@code _id} attribute.
*/
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
ReplaceOptions options = new ReplaceOptions();
if (operation.upsert()) {
options.upsert(operation.upsert());
}
List<ReplaceOneModel<Object>> docs = operation.values().stream()
.map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
.collect(Collectors.toList());
Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}
@Override
public Uni<UpdateResult> replaceOne(Bson filter, T replacement, ReplaceOptions options) {
return Wrappers.toUni(collection.replaceOne(filter, replacement, options));
}
@Override
public Uni<UpdateResult> replaceOne(ClientSession clientSession, Bson filter, T replacement,
ReplaceOptions options) {
return Wrappers.toUni(collection.replaceOne(clientSession, filter, replacement, options));
}
/**
* Replaces a single synchronized document by its given id with the given full document
* replacement. No replacement will occur if the _id is not being synchronized.
*
* @param nsConfig the namespace sync configuration of the namespace where the document lives.
* @param documentId the _id of the document.
* @param document the replacement document.
*/
@CheckReturnValue
private LocalSyncWriteModelContainer updateOrUpsertOneFromResolution(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId,
final BsonDocument document,
final BsonDocument atVersion,
final CompactChangeEvent<BsonDocument> remoteEvent
) {
final MongoNamespace namespace = nsConfig.getNamespace();
final ChangeEvent<BsonDocument> event;
final Lock lock =
this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final CoreDocumentSynchronizationConfig config;
final BsonDocument docForStorage;
try {
config =
syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
// Remove forbidden fields from the resolved document before it will updated/upserted in the
// local collection.
docForStorage = sanitizeDocument(document);
if (document.get("_id") == null && remoteEvent.getDocumentKey().get("_id") != null) {
document.put("_id", remoteEvent.getDocumentKey().get("_id"));
}
if (remoteEvent.getOperationType() == OperationType.DELETE) {
event = ChangeEvents.changeEventForLocalInsert(namespace, docForStorage, true);
} else {
if (remoteEvent.getFullDocument() != null) {
event = ChangeEvents.changeEventForLocalUpdate(
namespace,
documentId,
sanitizeUpdateDescription(UpdateDescription.diff(
sanitizeDocument(remoteEvent.getFullDocument()),
docForStorage)),
docForStorage,
true);
} else {
// We can't compute an update description here since we don't know what the remote full
// document looks like. This means that if rules prevent us from writing to the entire
// document, we would need to fetch the document to compute the update description, which
// is not something we want to do. One potential option to get around this would be to
// change the server-side logic for replaces to add a "merge" argument, that would
// translate the replace sent by this client to an update that only modifies the fields
// it has the permission to see.
// See STITCH-2888
event = ChangeEvents.changeEventForLocalReplace(
namespace,
documentId,
document,
true
);
}
}
} finally {
lock.unlock();
}
config.setSomePendingWrites(
logicalT,
atVersion,
HashUtils.hash(docForStorage),
event);
final LocalSyncWriteModelContainer syncWriteModelContainer = newWriteModelContainer(nsConfig);
syncWriteModelContainer.addDocIDs(documentId);
syncWriteModelContainer.addLocalWrite(
new ReplaceOneModel<>(getDocumentIdFilter(documentId), docForStorage,
new ReplaceOptions().upsert(true)));
syncWriteModelContainer.addLocalChangeEvent(event);
syncWriteModelContainer.addConfigWrite(
new ReplaceOneModel<>(CoreDocumentSynchronizationConfig.getDocFilter(
namespace, config.getDocumentId()), config));
return syncWriteModelContainer;
}
/**
* Replaces a single synchronized document by its given id with the given full document
* replacement. No replacement will occur if the _id is not being synchronized.
*
* @param nsConfig the namespace synchronization config of the namespace where the document
* lives.
* @param documentId the _id of the document.
* @param remoteDocument the replacement document.
*/
@CheckReturnValue
private LocalSyncWriteModelContainer replaceOrUpsertOneFromRemote(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId,
final BsonDocument remoteDocument,
final BsonDocument atVersion
) {
final MongoNamespace namespace = nsConfig.getNamespace();
final ChangeEvent<BsonDocument> event;
final Lock lock =
this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final BsonDocument docForStorage;
final CoreDocumentSynchronizationConfig config;
try {
config = syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
docForStorage = sanitizeDocument(remoteDocument);
config.setPendingWritesComplete(HashUtils.hash(docForStorage), atVersion);
event = ChangeEvents.changeEventForLocalReplace(namespace, documentId, docForStorage, false);
} finally {
lock.unlock();
}
final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);
container.addDocIDs(documentId);
container.addLocalWrite(new ReplaceOneModel<>(
getDocumentIdFilter(documentId),
docForStorage,
new ReplaceOptions().upsert(true)));
container.addLocalChangeEvent(event);
container.addConfigWrite(new ReplaceOneModel<>(
CoreDocumentSynchronizationConfig.getDocFilter(namespace, config.getDocumentId()
), config));
return container;
}
private static ReplaceOptions upsert() {
return new ReplaceOptions().upsert(true);
}
public V createOrUpdate(final V value, final long maxTime, final TimeUnit timeUnit) {
final Document doc = encode(value);
collectionWithWriteTimeout(maxTime, timeUnit)
.replaceOne(byId(keyOf(value)), doc, new ReplaceOptions().upsert(true));
return decode(doc);
}
@Override
public Publisher<UpdateResult> replaceOne(final Bson filter, final TDocument replacement) {
return replaceOne(filter, replacement, new ReplaceOptions());
}
@Override
public Publisher<UpdateResult> replaceOne(final ClientSession clientSession, final Bson filter, final TDocument replacement) {
return replaceOne(clientSession, filter, replacement, new ReplaceOptions());
}
/**
* Replace a document in the collection according to the specified arguments.
*
* @param filter the query filter to apply the the replace operation
* @param replacement the replacement document
* @param options the options to apply to the replace operation
* @return a {@link Uni} receiving the {@link UpdateResult}
*/
Uni<UpdateResult> replaceOne(Bson filter, T replacement, ReplaceOptions options);
/**
* Replace a document in the collection according to the specified arguments.
*
* @param clientSession the client session with which to associate this operation
* @param filter the query filter to apply the the replace operation
* @param replacement the replacement document
* @param options the options to apply to the replace operation
* @return a {@link Uni} receiving the {@link UpdateResult}
*/
Uni<UpdateResult> replaceOne(ClientSession clientSession, Bson filter, T replacement,
ReplaceOptions options);
/**
* Replace a document in the collection according to the specified arguments.
*
* @param filter the query filter to apply the the replace operation
* @param replacement the replacement document
* @param options the options to apply to the replace operation
* @return a publisher with a single element the UpdateResult
* @mongodb.driver.manual tutorial/modify-documents/#replace-the-document Replace
* @since 1.8
*/
Publisher<UpdateResult> replaceOne(Bson filter, TDocument replacement, ReplaceOptions options);
/**
* Replace a document in the collection according to the specified arguments.
*
* @param clientSession the client session with which to associate this operation
* @param filter the query filter to apply the the replace operation
* @param replacement the replacement document
* @param options the options to apply to the replace operation
* @return a publisher with a single element the UpdateResult
* @mongodb.driver.manual tutorial/modify-documents/#replace-the-document Replace
* @mongodb.server.release 3.6
* @since 1.8
*/
Publisher<UpdateResult> replaceOne(ClientSession clientSession, Bson filter, TDocument replacement, ReplaceOptions options);