下面列出了com.mongodb.client.model.ReplaceOptions#upsert ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 更新或插入数据
*
* @param dbName
* @param collectionName
* @param document
* @param filter
* @return
*/
public boolean upsert(String dbName, String collectionName, Document document, Document filter) {
if (document != null) {
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
mongoClient.getDatabase(dbName).getCollection(collectionName).replaceOne(filter, document, replaceOptions);
return true;
}
return false;
}
@Override
public boolean upsertBar(String dbName, String collectionName, List<BarField> barList) {
if (barList == null || barList.isEmpty()) {
logger.error("更新插入Bar集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (BarField bar : barList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", bar.getUnifiedSymbol());
filterDocument.put("actionTimestamp", bar.getActionTimestamp());
Document barDocument = barToDocument(bar);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, barDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Bar集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), barList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp", "unifiedSymbol"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Bar集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
@Override
public boolean upsertTick(String dbName, String collectionName, List<TickField> tickList) {
if (tickList == null || tickList.isEmpty()) {
logger.error("更新插入Tick集合错误,数据集合为空");
return false;
}
List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
long beginTime = System.currentTimeMillis();
for (TickField tick : tickList) {
Document filterDocument = new Document();
filterDocument.put("unifiedSymbol", tick.getUnifiedSymbol());
filterDocument.put("actionTimestamp", tick.getActionTimestamp());
Document tickDocument = tickToDocument(tick);
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(true);
ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(filterDocument, tickDocument, replaceOptions);
writeModelList.add(replaceOneModel);
}
logger.info("更新插入Tick集合,数据库{},集合{},数据转换耗时{}ms,共{}条数据", dbName, collectionName, (System.currentTimeMillis() - beginTime), tickList.size());
beginTime = System.currentTimeMillis();
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).createIndex(Indexes.ascending("actionTimestamp"));
todayMarketDataDBClient.getDatabase(dbName).getCollection(collectionName).bulkWrite(writeModelList);
logger.info("更新插入Tick集合,数据库{},集合{},数据库操作耗时{}ms,共{}条操作", dbName, collectionName, (System.currentTimeMillis() - beginTime), writeModelList.size());
return true;
}
/**
* Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
* with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
* object to BsonDocument to extract {@code _id} attribute.
*/
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
ReplaceOptions options = new ReplaceOptions();
if (operation.upsert()) {
options.upsert(operation.upsert());
}
List<ReplaceOneModel<Object>> docs = operation.values().stream()
.map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
.collect(Collectors.toList());
Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}