下面列出了怎么用com.mongodb.client.model.InsertManyOptions的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public CompletableFuture<?> appendPage(Page page)
{
MongoCollection<Document> collection = mongoSession.getCollection(schemaTableName);
List<Document> batch = new ArrayList<>(page.getPositionCount());
for (int position = 0; position < page.getPositionCount(); position++) {
Document doc = new Document();
for (int channel = 0; channel < page.getChannelCount(); channel++) {
MongoColumnHandle column = columns.get(channel);
doc.append(column.getName(), getObjectValue(columns.get(channel).getType(), page.getBlock(channel), position));
}
batch.add(doc);
}
collection.insertMany(batch, new InsertManyOptions().ordered(true));
return NOT_BLOCKED;
}
MongoSinkContext(
MongoClient client,
MongoCollection<T> collection,
ConsumerEx<MongoClient> destroyFn,
boolean ordered,
boolean bypassValidation
) {
this.client = client;
this.collection = collection;
this.destroyFn = destroyFn;
this.insertManyOptions = new InsertManyOptions()
.ordered(ordered)
.bypassDocumentValidation(bypassValidation);
documents = new ArrayList<>();
}
private void flush() {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
try {
mongoCollection.insertMany(batch, new InsertManyOptions().ordered(spec.ordered()));
} catch (MongoBulkWriteException e) {
if (spec.ordered()) {
throw e;
}
}
batch.clear();
}
@Override
public void bulkInsert(List<T> entities) {
var watch = new StopWatch();
if (entities == null || entities.isEmpty()) throw new Error("entities must not be empty");
for (T entity : entities)
validator.validate(entity);
try {
collection().insertMany(entities, new InsertManyOptions().ordered(false));
} finally {
long elapsed = watch.elapsed();
int size = entities.size();
ActionLogContext.track("mongo", elapsed, 0, size);
logger.debug("bulkInsert, collection={}, size={}, elapsed={}", collectionName, size, elapsed);
checkSlowOperation(elapsed);
}
}
/**
* If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true,
* this method will insert local notes into MongoDB on startup.
* If a note already exists in MongoDB, skip it.
*/
private void insertFileSystemNotes() throws IOException {
LinkedList<Document> docs = new LinkedList<>(); // docs to be imported
OldNotebookRepo vfsRepo = new OldVFSNotebookRepo();
vfsRepo.init(this.conf);
List<OldNoteInfo> infos = vfsRepo.list(null);
// collect notes to be imported
for (OldNoteInfo info : infos) {
Note note = vfsRepo.get(info.getId(), null);
Document doc = noteToDocument(note);
docs.add(doc);
}
/*
* 'ordered(false)' option allows to proceed bulk inserting even though
* there are duplicated documents. The duplicated documents will be skipped
* and print a WARN log.
*/
try {
coll.insertMany(docs, new InsertManyOptions().ordered(false));
} catch (MongoBulkWriteException e) {
printDuplicatedException(e); //print duplicated document warning log
}
vfsRepo.close(); // it does nothing for now but maybe in the future...
}
@Override
public void asyncInsert(
String database,
String collection,
boolean continueOnError,
List<? extends BsonDocument> docsToInsert) throws MongoException {
try {
owner.getDriverClient()
.getDatabase(database)
.getCollection(collection, BsonDocument.class)
.insertMany(docsToInsert, new InsertManyOptions()
.ordered(continueOnError));
} catch (com.mongodb.MongoException ex) { //a general Mongo driver exception
if (ErrorCode.isErrorCode(ex.getCode())) {
throw toMongoException(ex);
} else {
throw toRuntimeMongoException(ex);
}
}
}
public void saveAliasList(int chainId, List<AliasInfo> aliasInfoList) {
if (aliasInfoList.isEmpty()) {
return;
}
List<Document> documentList = new ArrayList<>();
for (AliasInfo info : aliasInfoList) {
Document document = DocumentTransferTool.toDocument(info, "address");
documentList.add(document);
}
InsertManyOptions options = new InsertManyOptions();
options.ordered(false);
mongoDBService.insertMany(ALIAS_TABLE + chainId, documentList, options);
}
public void saveCoinDataList(int chainId, List<CoinDataInfo> coinDataList) {
if (coinDataList.isEmpty()) {
return;
}
List<Document> documentList = new ArrayList<>();
for (CoinDataInfo info : coinDataList) {
documentList.add(info.toDocument());
}
InsertManyOptions options = new InsertManyOptions();
options.ordered(false);
mongoDBService.insertMany(COINDATA_TABLE + chainId, documentList, options);
}
public void savePunishList(int chainId, List<PunishLogInfo> punishLogList) {
if (punishLogList.isEmpty()) {
return;
}
List<Document> documentList = new ArrayList<>();
for (PunishLogInfo punishLog : punishLogList) {
documentList.add(DocumentTransferTool.toDocument(punishLog));
}
InsertManyOptions options = new InsertManyOptions();
options.ordered(false);
mongoDBService.insertMany(PUNISH_TABLE + chainId, documentList, options);
}
@Test
void testInsertionWithOptionsOfManyDocumentsAndQueries() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection(randomAlphaString(8));
List<Document> documents = new ArrayList<>();
for (int i = 0; i < 100; i++) {
documents.add(new Document("i", i));
}
collection.insertMany(documents, new InsertManyOptions().ordered(true)).await().indefinitely();
Long count = collection.countDocuments().await().indefinitely();
assertThat(count).isEqualTo(100);
count = collection.countDocuments(eq("i", 10)).await().indefinitely();
assertThat(count).isEqualTo(1);
Optional<Document> document = collection.find().collectItems().first().await().asOptional().indefinitely();
assertThat(document).isNotEmpty().hasValueSatisfying(doc -> assertThat(doc.get("i", 0)));
document = collection.find(eq("i", 20)).collectItems().first().await().asOptional().indefinitely();
assertThat(document).isNotEmpty().hasValueSatisfying(doc -> assertThat(doc.get("i", 20)));
List<Document> list = collection.find().collectItems().asList().await().indefinitely();
assertThat(list).hasSize(100);
list = collection.find(gt("i", 50)).collectItems().asList().await().indefinitely();
assertThat(list).hasSize(49);
}
@Override
public Observable<Success> insertMany(final List<? extends TDocument> documents, final InsertManyOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() {
@Override
public void apply(final SingleResultCallback<Success> callback) {
wrapped.insertMany(documents, options, voidToSuccessCallback(callback));
}
}), observableAdapter);
}
private void saveNotePathOrIgnore(String noteId, String noteName, String pId) {
Document doc = new Document(Fields.ID, noteId)
.append(Fields.PID, pId)
.append(Fields.IS_DIR, false)
.append(Fields.NAME, noteName);
folders.insertMany(Collections.singletonList(doc), new InsertManyOptions().ordered(false));
}
@Override
public Publisher<Success> insertMany(final List<? extends TDocument> documents, final InsertManyOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertMany(documents, options, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> insertMany(final ClientSession clientSession, final List<? extends TDocument> documents,
final InsertManyOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertMany(clientSession.getWrapped(), documents, options, voidToSuccessCallback(callback));
}
}));
}
public HashMap<String, Object> capture(List<BsonDocument> bsonDocumentList) {
HashMap<String, Object> retMsg = new HashMap<String, Object>();
MongoCollection<BsonDocument> collection = Configuration.mongoDatabase.getCollection("EventData",
BsonDocument.class);
try {
InsertManyOptions option = new InsertManyOptions();
option.ordered(false);
collection.insertMany(bsonDocumentList, option);
} catch (MongoBulkWriteException e) {
retMsg.put("error", e.getMessage());
return retMsg;
}
retMsg.put("eventCaptured", bsonDocumentList.size());
return retMsg;
}
public HashMap<String, Object> capture(List<BsonDocument> bsonDocumentList) {
HashMap<String, Object> retMsg = new HashMap<String, Object>();
MongoCollection<BsonDocument> collection = Configuration.mongoDatabase.getCollection("EventData",
BsonDocument.class);
try {
InsertManyOptions option = new InsertManyOptions();
option.ordered(false);
collection.insertMany(bsonDocumentList, option);
} catch (MongoBulkWriteException e) {
retMsg.put("error", e.getMessage());
return retMsg;
}
retMsg.put("eventCaptured", bsonDocumentList.size());
return retMsg;
}
public void saveTxList(int chainId, List<TransactionInfo> txList) {
if (txList.isEmpty()) {
return;
}
long time1, time2;
time1 = System.currentTimeMillis();
// //当交易记录表超过100万条时,首先删除要最开始保存的记录
// totalCount += txList.size();
// if (totalCount > 1000000) {
// int deleteCount = (int) (totalCount - 1000000);
// BasicDBObject fields = new BasicDBObject();
// fields.append("_id", 1);
// List<Document> docList = this.mongoDBService.pageQuery(TX_TABLE + chainId, null, fields, Sorts.ascending("createTime"), 1, deleteCount);
// List<String> hashList = new ArrayList<>();
// for (Document document : docList) {
// hashList.add(document.getString("_id"));
// }
// mongoDBService.delete(TX_TABLE + chainId, Filters.in("_id", hashList));
//// time2 = System.currentTimeMillis();
//// System.out.println("-----------delete, use: " + (time2 - time1));
//// time1 = System.currentTimeMillis();
// totalCount = 1000000;
// }
InsertManyOptions options = new InsertManyOptions();
options.ordered(false);
List<Document> documentList = new ArrayList<>();
int i = 0;
for (TransactionInfo txInfo : txList) {
if (txUnConfirmHashSet.contains(txInfo.getHash())) {
deleteUnConfirmTx(chainId, txInfo.getHash());
}
documentList.add(txInfo.toDocument());
i++;
if (i == 1000) {
mongoDBService.insertMany(TX_TABLE + chainId, documentList, options);
documentList.clear();
i = 0;
}
}
if (documentList.size() != 0) {
mongoDBService.insertMany(TX_TABLE + chainId, documentList, options);
}
// time2 = System.currentTimeMillis();
// System.out.println("-----------insertMany, use: " + (time2 - time1));
}
@Override
public Uni<InsertManyResult> insertMany(List<? extends T> tDocuments, InsertManyOptions options) {
return Wrappers.toUni(collection.insertMany(tDocuments, options));
}
@Override
public Uni<InsertManyResult> insertMany(ClientSession clientSession, List<? extends T> tDocuments,
InsertManyOptions options) {
return Wrappers.toUni(collection.insertMany(clientSession, tDocuments, options));
}
@Override
public Observable<Success> insertMany(final List<? extends TDocument> documents) {
return insertMany(documents, new InsertManyOptions());
}
private void saveNoteOrIgnore(Note note) {
Document doc = noteToDocument(note);
notes.insertMany(Collections.singletonList(doc), new InsertManyOptions().ordered(false));
}
@Override
public void insertMany(final List<Document> items) {
collection.insertMany(items, new InsertManyOptions().ordered(false));
}
@Override
public Publisher<Success> insertMany(final List<? extends TDocument> documents) {
return insertMany(documents, new InsertManyOptions());
}
@Override
public Publisher<Success> insertMany(final ClientSession clientSession, final List<? extends TDocument> documents) {
return insertMany(clientSession, documents, new InsertManyOptions());
}
/**
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API.
*
* @param documents the documents to insert
* @param options the options to apply to the operation
* @return a {@link Uni} completed successfully when the operation completes, or propagating a
* {@link com.mongodb.DuplicateKeyException} or {@link com.mongodb.MongoException} on failure.
*/
Uni<InsertManyResult> insertMany(List<? extends T> documents, InsertManyOptions options);
/**
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API.
*
* @param clientSession the client session with which to associate this operation
* @param documents the documents to insert
* @param options the options to apply to the operation
* @return a {@link Uni} completed successfully when the operation completes, or propagating a
* {@link com.mongodb.DuplicateKeyException} or {@link com.mongodb.MongoException} on failure.
*/
Uni<InsertManyResult> insertMany(ClientSession clientSession, List<? extends T> documents,
InsertManyOptions options);
/**
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with a
* server < 2.6, using this method will be faster due to constraints in the bulk API related to error handling.
*
* @param documents the documents to insert
* @param options the options to apply to the operation
* @return an Observable with a single element indicating when the operation has completed or with either a
* com.mongodb.DuplicateKeyException or com.mongodb.MongoException
*/
Observable<Success> insertMany(List<? extends TDocument> documents, InsertManyOptions options);
/**
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with a
* server < 2.6, using this method will be faster due to constraints in the bulk API related to error handling.
*
* @param documents the documents to insert
* @param options the options to apply to the operation
* @return a publisher with a single element indicating when the operation has completed or with either a
* com.mongodb.DuplicateKeyException or com.mongodb.MongoException
*/
Publisher<Success> insertMany(List<? extends TDocument> documents, InsertManyOptions options);
/**
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with a
* server < 2.6, using this method will be faster due to constraints in the bulk API related to error handling.
*
* @param clientSession the client session with which to associate this operation
* @param documents the documents to insert
* @param options the options to apply to the operation
* @return a publisher with a single element indicating when the operation has completed or with either a
* com.mongodb.DuplicateKeyException or com.mongodb.MongoException
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<Success> insertMany(ClientSession clientSession, List<? extends TDocument> documents, InsertManyOptions options);