下面列出了com.mongodb.client.model.BulkWriteOptions#com.mongodb.client.model.InsertOneModel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void bulkWrite() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))))
.await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
private static Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
return collection.bulkWrite(bulk).onItem().ignore().andContinueWithNull();
}
private static void persistOrUpdate(MongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
collection.bulkWrite(bulk);
}
private static void loadRecords(int totalRecords, String dataset, MongoCollection mc) throws IOException {
List<Document> documents = new ArrayList<>(totalRecords);
try(InputStream inputStream = QueryConverterIT.class.getResourceAsStream("/" + dataset + ".json");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = bufferedReader.readLine())!=null) {
documents.add(Document.parse(line));
}
}
for (Iterator<List<WriteModel>> iterator = Iterables.partition(Lists.transform(documents, new Function<Document, WriteModel>() {
@Override
public WriteModel apply(Document document) {
return new InsertOneModel(document);
}
}),10000).iterator(); iterator.hasNext();) {
mc.bulkWrite(iterator.next());
}
}
public void saveBulkList(int chainId, List<BlockHeaderInfo> blockHeaderInfos) {
List<WriteModel<Document>> modelList = new ArrayList<>();
for (BlockHeaderInfo headerInfo : blockHeaderInfos) {
Document document = DocumentTransferTool.toDocument(headerInfo, "height");
modelList.add(new InsertOneModel(document));
}
long time1 = System.currentTimeMillis();
mongoDBService.bulkWrite(BLOCK_HEADER_TABLE + chainId, modelList);
long time2 = System.currentTimeMillis();
System.out.println("---------------------use:" + (time2 - time1));
}
/**
* Stores all documents in the stream. Documents that have an ID will replace existing
* documents with that ID, documents without ID will be inserted and MongoDB will assign it with
* a unique ID.
* @param topic topic to store the documents for
* @param docs documents to insert.
* @throws MongoException if a document could not be stored.
*/
public void store(String topic, Stream<Document> docs) throws MongoException {
getCollection(topic).bulkWrite(docs
.map(doc -> {
Object mongoId = doc.get(MONGO_ID_KEY);
if (mongoId != null) {
return new ReplaceOneModel<>(eq(MONGO_ID_KEY, mongoId), doc, UPDATE_UPSERT);
} else {
return new InsertOneModel<>(doc);
}
})
.collect(Collectors.toList()));
}
public MongoAdminClient importJsonFile(String fileNamePath) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
@Override
public StorageWriteResult write(List<DataContainer> containers) throws Exception {
MongoCollection<Document> collection = MongoStorageAdapter.getCollection(MongoStorageAdapter.collectionEventRecordsName);
// Build an array of documents
List<WriteModel<Document>> documents = new ArrayList<>();
for (DataContainer container : containers) {
Document document = documentFromView(container);
// Prism.getInstance().getLogger().debug(DataUtil.jsonFromDataView(container).toString());
// TTL
if (expires) {
document.append("Expires", DateUtil.parseTimeStringToDate(expiration, true));
}
// Insert
documents.add(new InsertOneModel<>(document));
}
// Write
collection.bulkWrite(documents, bulkWriteOptions);
// @todo implement real results, BulkWriteResult
return new StorageWriteResult();
}
public void Create(Document doc)
{
ops.add(new InsertOneModel<Document>(doc));
FlushOpsIfFull();
}
private List<WriteModel<AccessTokenMongo>> convert(List<AccessToken> accessTokens) {
return accessTokens.stream().map(accessToken -> new InsertOneModel<>(convert(accessToken))).collect(Collectors.toList());
}
private List<WriteModel<RefreshTokenMongo>> convert(List<RefreshToken> refreshTokens) {
return refreshTokens.stream().map(refreshToken -> new InsertOneModel<>(convert(refreshToken))).collect(Collectors.toList());
}
private List<WriteModel<AuditMongo>> convert(List<Audit> audits) {
return audits.stream().map(audit -> new InsertOneModel<>(convert(audit))).collect(Collectors.toList());
}