下面列出了怎么用com.mongodb.client.model.UpdateOptions的API类实例代码及写法,或者点击链接到github查看源代码。
protected final FluentFuture<Integer> doUpdate(
final Constraints.ConstraintHost criteria,
final Constraints.Constraint update,
final UpdateOptions options) {
checkNotNull(criteria, "criteria");
checkNotNull(update, "update");
checkNotNull(options, "options");
return submit(new Callable<UpdateResult>() {
@Override
public UpdateResult call() {
return collection()
.updateMany(
convertToBson(criteria),
convertToBson(update),
options);
}
}).lazyTransform(new Function<UpdateResult, Integer>() {
@Override
public Integer apply(UpdateResult input) {
return (int) input.getModifiedCount();
}
});
}
/**
* Adds a new edit to the batch
*
* @param key the key to write
* @param value the value to write. Null indicates we should delete this key
* @return this
*/
public WriteBatch addEdit(byte[] key, byte[] value) {
if (value == null) {
DeleteOneModel deleteModel =
new DeleteOneModel<>(eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)));
edits.add(deleteModel);
} else {
UpdateOneModel updateModel =
new UpdateOneModel<>(
eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)),
Updates.set(MongoConstants.VALUE_FIELD_NAME, new BsonBinary(value)),
new UpdateOptions().upsert(true));
edits.add(updateModel);
}
return this;
}
private void storeIndexSettings() {
indexLock.writeLock().lock();
try {
MongoDatabase db = mongo.getDatabase(mongoConfig.getDatabaseName());
MongoCollection<Document> dbCollection = db.getCollection(indexConfig.getIndexName() + CONFIG_SUFFIX);
Document settings = IndexConfigUtil.toDocument(indexConfig);
settings.put(MongoConstants.StandardFields._ID, SETTINGS_ID);
Document query = new Document();
query.put(MongoConstants.StandardFields._ID, SETTINGS_ID);
dbCollection.replaceOne(query, settings, new UpdateOptions().upsert(true));
}
finally {
indexLock.writeLock().unlock();
}
}
private Uni<Document> upsertDoc(String collection, Document docToInsert,
Document insertStatement,
String expectedId) {
return client.getDatabase(DATABASE).getCollection(collection)
.updateMany(eq("foo", docToInsert.getString("foo")),
insertStatement,
new UpdateOptions().upsert(true))
.onItem().produceUni(result -> {
assertThat(result.getModifiedCount()).isEqualTo(0);
if (expectedId == null) {
assertThat(0).isEqualTo(result.getMatchedCount());
assertThat(result.getUpsertedId()).isNotNull();
} else {
assertThat(1).isEqualTo(result.getMatchedCount());
assertThat(result.getUpsertedId()).isNull();
}
return client.getDatabase(DATABASE).getCollection(collection).find().collectItems().first();
});
}
private Mono<UpdateResult> updateAchievement(int achievements) {
// If the achievement is already in this state, no need to request an update
if (this.getBean().getAchievements() == achievements) {
LOGGER.debug("[DBUser {}] Achievements update useless, aborting: {}",
this.getId().asLong(), achievements);
return Mono.empty();
}
LOGGER.debug("[DBUser {}] Achievements update: {}", this.getId().asLong(), achievements);
return Mono.from(DatabaseManager.getUsers()
.getCollection()
.updateOne(
Filters.eq("_id", this.getId().asString()),
Updates.set("achievements", achievements),
new UpdateOptions().upsert(true)))
.doOnNext(result -> LOGGER.trace("[DBUser {}] Achievements update result: {}",
this.getId().asLong(), result))
.doOnTerminate(() -> DB_REQUEST_COUNTER.labels(UsersCollection.NAME).inc());
}
@Override
public void storeSourceDocument(String uniqueId, long timeStamp, Document document, List<Metadata> metaDataList) throws Exception {
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection<Document> coll = db.getCollection(rawCollectionName);
Document mongoDocument = new Document();
mongoDocument.putAll(document);
if (!metaDataList.isEmpty()) {
Document metadataMongoDoc = new Document();
for (Metadata meta : metaDataList) {
metadataMongoDoc.put(meta.getKey(), meta.getValue());
}
mongoDocument.put(METADATA, metadataMongoDoc);
}
mongoDocument.put(TIMESTAMP, timeStamp);
mongoDocument.put(MongoConstants.StandardFields._ID, uniqueId);
Document query = new Document(MongoConstants.StandardFields._ID, uniqueId);
coll.replaceOne(query, mongoDocument, new UpdateOptions().upsert(true));
}
public static void storeBlock(MongoBlock mongoBlock) {
// System.out.println("Store: " + mongoBlock.getBlockNumber());
MongoCollection<Document> c = mongoBlock.mongoFile.mongoDirectory.getBlocksCollection();
Document query = new Document();
query.put(MongoDirectory.FILE_NUMBER, mongoBlock.mongoFile.fileNumber);
query.put(MongoDirectory.BLOCK_NUMBER, mongoBlock.blockNumber);
Document object = new Document();
object.put(MongoDirectory.FILE_NUMBER, mongoBlock.mongoFile.fileNumber);
object.put(MongoDirectory.BLOCK_NUMBER, mongoBlock.blockNumber);
object.put(MongoDirectory.BYTES, new Binary(mongoBlock.bytes));
c.replaceOne(query, object, new UpdateOptions().upsert(true));
}
/**
* create until parent folder if not exists.
*
* @param splitPath path to completed.
* @return direct parent folder id
*/
private String completeFolder(String[] splitPath) {
String pId = "0";
for (String currentPath : splitPath) {
Document query = new Document(Fields.PID, pId)
.append(Fields.IS_DIR, true)
.append(Fields.NAME, currentPath);
String cId = new ObjectId().toString();
Document doc = new Document("$setOnInsert",
new Document(Fields.ID, cId)
.append(Fields.PID, pId)
.append(Fields.IS_DIR, true)
.append(Fields.NAME, currentPath));
Document exist = folders.find(query).first();
if (exist == null) {
folders.updateOne(query, doc, new UpdateOptions().upsert(true));
pId = cId;
} else {
pId = exist.getString(Fields.ID);
}
}
return pId;
}
private void insertContextDataAggregatedForResoultion(String dbName, String collectionName,
GregorianCalendar calendar, String entityId, String entityType, String attrName, String attrType,
double max, double min, double sum, double sum2, int numSamples, Resolution resolution) {
// Get database and collection
MongoDatabase db = getDatabase(dbName);
MongoCollection collection = db.getCollection(collectionName);
// Build the query
BasicDBObject query = buildQueryForInsertAggregated(calendar, entityId, entityType, attrName, resolution);
// Prepopulate if needed
BasicDBObject insert = buildInsertForPrepopulate(attrType, resolution, true);
UpdateResult res = collection.updateOne(query, insert, new UpdateOptions().upsert(true));
if (res.getMatchedCount() == 0) {
LOGGER.debug("Prepopulating data, database=" + dbName + ", collection=" + collectionName + ", query="
+ query.toString() + ", insert=" + insert.toString());
} // if
// Do the update
BasicDBObject update = buildUpdateForUpdate(attrType, calendar, max, min, sum, sum2, numSamples);
LOGGER.debug("Updating data, database=" + dbName + ", collection=" + collectionName + ", query="
+ query.toString() + ", update=" + update.toString());
collection.updateOne(query, update);
}
protected final FluentFuture<Integer> doUpsert(
final Constraints.ConstraintHost criteria,
final T document) {
checkNotNull(criteria, "criteria");
checkNotNull(document, "document");
return submit(new Callable<Integer>() {
@Override
public Integer call() {
collection().replaceOne(convertToBson(criteria), document, new UpdateOptions().upsert(true));
// upsert will always return 1:
// if document doesn't exists, it will be inserted (modCount == 1)
// if document exists, it will be updated (modCount == 1)
return 1;
}
});
}
/**
* Un-assigns a key/value property from the element. The object value of the
* removed property is returned.
*
* @param key the key of the property to remove from the element
* @return the object value associated with that key prior to removal. Should be
* instance of BsonValue
*/
@Override
public <T> T removeProperty(final String key) {
try {
BsonValue value = getProperty(key);
BsonDocument filter = new BsonDocument();
filter.put(Tokens.ID, new BsonString(this.id));
BsonDocument update = new BsonDocument();
update.put("$unset", new BsonDocument(key, new BsonNull()));
if (this instanceof ChronoVertex) {
graph.getVertexCollection().updateOne(filter, update, new UpdateOptions().upsert(true));
return (T) value;
} else {
graph.getEdgeCollection().updateOne(filter, update, new UpdateOptions().upsert(true));
return (T) value;
}
} catch (MongoWriteException e) {
throw e;
}
}
@Override
public void putAccount(AccountData account) {
BasicDBObject object = new BasicDBObject("_id", account.getId().getAddress());
if (account.isHuman()) {
object.put(ACCOUNT_HUMAN_DATA_FIELD, humanToObject(account.asHuman()));
} else if (account.isRobot()) {
object.put(ACCOUNT_ROBOT_DATA_FIELD, robotToObject(account.asRobot()));
} else {
throw new IllegalStateException("Account is neither a human nor a robot");
}
getAccountCollection().replaceOne(
filterAccountBy(account.getId()),
object,
new UpdateOptions().upsert(true));
}
private void insertContextDataAggregatedForResoultion(String dbName, String collectionName,
GregorianCalendar calendar, String entityId, String entityType, String attrName, String attrType,
HashMap<String, Integer> counts, Resolution resolution) {
// Get database and collection
MongoDatabase db = getDatabase(dbName);
MongoCollection collection = db.getCollection(collectionName);
// Build the query
BasicDBObject query = buildQueryForInsertAggregated(calendar, entityId, entityType, attrName, resolution);
// Prepopulate if needed
BasicDBObject insert = buildInsertForPrepopulate(attrType, resolution, false);
UpdateResult res = collection.updateOne(query, insert, new UpdateOptions().upsert(true));
if (res.getMatchedCount() == 0) {
LOGGER.debug("Prepopulating data, database=" + dbName + ", collection=" + collectionName + ", query="
+ query.toString() + ", insert=" + insert.toString());
} // if
// Do the update
for (String key : counts.keySet()) {
int count = counts.get(key);
BasicDBObject update = buildUpdateForUpdate(attrType, resolution, calendar, key, count);
LOGGER.debug("Updating data, database=" + dbName + ", collection=" + collectionName + ", query="
+ query.toString() + ", update=" + update.toString());
collection.updateOne(query, update);
} // for
}
/**
* Clear Static Properties
*/
@SuppressWarnings("deprecation")
public void clearStaticProperties() {
BsonDocument filter = new BsonDocument();
filter.put(Tokens.ID, new BsonString(this.id));
if (this instanceof ChronoVertex) {
graph.getVertexCollection().replaceOne(filter, Converter.getBaseVertexDocument(this.id),
new UpdateOptions().upsert(true));
} else {
ChronoEdge ce = (ChronoEdge) this;
graph.getEdgeCollection().replaceOne(filter,
Converter.getBaseEdgeDocument(this.id, ce.getOutVertex().id, ce.getInVertex().id, ce.getLabel()),
new UpdateOptions().upsert(true));
}
}
@Test
void updateOne() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
CompletableFuture.allOf(
collection
.insertOne(new Document("id", 1).append("name", "superman").append("type", "heroes")
.append("stars", 5))
.subscribeAsCompletionStage(),
collection.insertOne(
new Document("id", 2).append("name", "batman").append("type", "heroes").append("stars", 4))
.subscribeAsCompletionStage(),
collection
.insertOne(new Document("id", 3).append("name", "frogman").append("type", "villain")
.append("stars", 1))
.subscribeAsCompletionStage(),
collection.insertOne(
new Document("id", 4).append("name", "joker").append("type", "villain").append("stars", 5))
.subscribeAsCompletionStage())
.join();
UpdateResult result = collection
.updateOne(new Document("id", 3), inc("stars", 3), new UpdateOptions().bypassDocumentValidation(true))
.await().indefinitely();
UpdateResult result2 = collection.updateOne(new Document("id", 2), inc("stars", -1)).await().indefinitely();
UpdateResult result3 = collection.updateOne(new Document("id", 50), inc("stars", -1)).await().indefinitely();
assertThat(result.getMatchedCount()).isEqualTo(1);
assertThat(result.getModifiedCount()).isEqualTo(1);
assertThat(result2.getMatchedCount()).isEqualTo(1);
assertThat(result2.getModifiedCount()).isEqualTo(1);
assertThat(result3.getMatchedCount()).isEqualTo(0);
assertThat(result3.getModifiedCount()).isEqualTo(0);
}
/**
* 更新或插入数据集合
*
* @param dbName
* @param collectionName
* @param document
* @param filter
* @return
*/
public boolean upsertMany(String dbName, String collectionName, Document document, Document filter) {
if (document != null) {
UpdateOptions updateOptions = new UpdateOptions();
updateOptions.upsert(true);
mongoClient.getDatabase(dbName).getCollection(collectionName).updateMany(filter, document, updateOptions);
return true;
}
return false;
}
public void Save(Document doc) {
if (!doc.containsKey("_id")) {
Create(doc);
return;
}
Document find = new Document("_id", doc.get("_id"));
UpdateOptions uo = new UpdateOptions();
uo.upsert(true);
ops.add(new ReplaceOneModel<Document>(find, doc, uo));
FlushOpsIfFull();
}
/**
* {@code value} must be serializable or serialized.
*/
public <T> Mono<UpdateResult> updateSetting(Setting setting, T value) {
LOGGER.debug("[DBGuild {}] Setting update: {}={}", this.getId().asLong(), setting, value);
return Mono.from(DatabaseManager.getGuilds()
.getCollection()
.updateOne(
Filters.eq("_id", this.getId().asString()),
Updates.set(String.format("settings.%s", setting), value),
new UpdateOptions().upsert(true)))
.doOnNext(result -> LOGGER.trace("[DBGuild {}] Setting update result: {}",
this.getId().asLong(), result))
.doOnTerminate(() -> DB_REQUEST_COUNTER.labels(GuildsCollection.NAME).inc());
}
@Override
public Mono<Void> insert() {
LOGGER.debug("[DBMember {} / {}] Insertion", this.getId().asLong(), this.getGuildId().asLong());
return Mono.from(DatabaseManager.getGuilds()
.getCollection()
.updateOne(Filters.eq("_id", this.getGuildId().asString()),
Updates.push("members", this.toDocument()),
new UpdateOptions().upsert(true)))
.doOnNext(result -> LOGGER.trace("[DBMember {} / {}] Insertion result: {}",
this.getId().asLong(), this.getGuildId().asLong(), result))
.doOnTerminate(() -> DB_REQUEST_COUNTER.labels(GuildsCollection.NAME).inc())
.then();
}
public Mono<UpdateResult> addToJackpot(long coins) {
final long value = (long) Math.ceil(coins / 100.0f);
LOGGER.debug("[Lottery] Jackpot update: {} coins", value);
return Mono.from(this.getCollection()
.updateOne(Filters.eq("_id", "jackpot"),
Updates.inc("jackpot", value),
new UpdateOptions().upsert(true)))
.doOnNext(result -> LOGGER.trace("[Lottery] Jackpot update result: {}", result))
.doOnTerminate(() -> DB_REQUEST_COUNTER.labels(LotteryCollection.NAME).inc());
}
public SyncUpdateResult execute(@Nullable final CoreStitchServiceClient service) {
final UpdateResult localResult = this.dataSynchronizer.updateOne(
namespace,
filter,
update,
new UpdateOptions().upsert(this.syncUpdateOptions.isUpsert()));
return new SyncUpdateResult(
localResult.getMatchedCount(),
localResult.getModifiedCount(),
localResult.getUpsertedId()
);
}
public SyncUpdateResult execute(@Nullable final CoreStitchServiceClient service) {
final UpdateResult localResult = this.dataSynchronizer.updateMany(
namespace,
filter,
update,
new UpdateOptions().upsert(this.syncUpdateOptions.isUpsert()));
return new SyncUpdateResult(
localResult.getMatchedCount(),
localResult.getModifiedCount(),
localResult.getUpsertedId()
);
}
@Override
public WriteModel<Document> toMongo() {
final Bson filter = getFilter();
final Bson update = new BsonDocument().append(SET,
new BsonDocument().append(FIELD_DELETE_AT, new BsonDateTime(0L)));
final UpdateOptions updateOptions = new UpdateOptions().bypassDocumentValidation(true);
return new UpdateOneModel<>(filter, update, updateOptions);
}
@Override
public Source<List<Throwable>, NotUsed> purge(final CharSequence namespace) {
final Bson filter = thingNamespaceFilter(namespace);
final Bson update = new BsonDocument().append(AbstractWriteModel.SET,
new BsonDocument().append(FIELD_DELETE_AT, new BsonDateTime(0L)));
final UpdateOptions updateOptions = new UpdateOptions().bypassDocumentValidation(true);
final WriteModel<Document> writeModel = new UpdateManyModel<>(filter, update, updateOptions);
return Source.fromPublisher(collection.bulkWrite(Collections.singletonList(writeModel)))
.map(bulkWriteResult -> Collections.<Throwable>emptyList())
.recoverWithRetries(1, new PFBuilder<Throwable, Source<List<Throwable>, NotUsed>>()
.matchAny(throwable -> Source.single(Collections.singletonList(throwable)))
.build());
}
@Override
public void asyncUpdate(
String database,
String collection,
BsonDocument selector,
BsonDocument update,
boolean upsert,
boolean multiUpdate) throws MongoException {
try {
UpdateOptions updateOptions = new UpdateOptions().upsert(
upsert
);
MongoCollection<org.bson.BsonDocument> mongoCollection =
owner.getDriverClient()
.getDatabase(database)
.getCollection(collection, org.bson.BsonDocument.class);
org.bson.BsonDocument translatedUpdate =
MongoBsonTranslator.translate(update);
if (multiUpdate) {
mongoCollection.updateMany(translatedUpdate, translatedUpdate, updateOptions);
} else {
mongoCollection.updateOne(translatedUpdate, translatedUpdate, updateOptions);
}
} catch (com.mongodb.MongoException ex) { //a general Mongo driver exception
if (ErrorCode.isErrorCode(ex.getCode())) {
throw toMongoException(ex);
} else {
throw toRuntimeMongoException(ex);
}
}
}
/**
* Put the records in the sink.
*
* @param collection the set of records to send.
*/
@Override
public void put(Collection<SinkRecord> collection) {
List<SinkRecord> records = new ArrayList<>(collection);
for (int i = 0; i < records.size(); i++) {
Map<String, List<WriteModel<Document>>> bulks = new HashMap<>();
for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
SinkRecord record = records.get(i);
Map<String, Object> jsonMap = SchemaUtils.toJsonMap((Struct) record.value());
String topic = record.topic();
if (bulks.get(topic) == null) {
bulks.put(topic, new ArrayList<WriteModel<Document>>());
}
Document newDocument = new Document(jsonMap)
.append("_id", record.kafkaOffset());
log.trace("Adding to bulk: {}", newDocument.toString());
bulks.get(topic).add(new UpdateOneModel<Document>(
Filters.eq("_id", record.kafkaOffset()),
new Document("$set", newDocument),
new UpdateOptions().upsert(true)));
}
i--;
log.trace("Executing bulk");
for (String key : bulks.keySet()) {
try {
com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
}
@Override
public void updateUser(String userId, User user) throws IResourceStore.ResourceStoreException {
String jsonUser = serialize(user);
Document document = Document.parse(jsonUser);
collection.updateOne(new Document("_id", new ObjectId(userId)),
new Document("$set", document), new UpdateOptions().upsert(true));
}
@Override
public void store(IResource currentResource) {
Resource resource = checkInternalResource(currentResource);
if (resource.getId() == null) {
currentCollection.insertOne(resource.getMongoDocument());
} else {
currentCollection.updateOne(
Filters.eq("_id", new ObjectId(resource.getId())),
new Document("$set", resource.getMongoDocument()),
new UpdateOptions().upsert(true));
}
}
@Override
public void updatePermissions(String resourceId, Permissions permissions) throws IResourceStore.ResourceStoreException {
String jsonPermissions = serialize(permissions);
Document permissionsDocument = Document.parse(jsonPermissions);
permissionsDocument.put("_id", new ObjectId(resourceId));
collection.updateOne(new Document("_id", new ObjectId(resourceId)),
new Document("$set", permissions), new UpdateOptions().upsert(true));
}
@Override
public Observable<UpdateResult> replaceOne(final Bson filter, final TDocument replacement, final UpdateOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<UpdateResult>>() {
@Override
public void apply(final SingleResultCallback<UpdateResult> callback) {
wrapped.replaceOne(filter, replacement, options, callback);
}
}), observableAdapter);
}