下面列出了怎么用com.mongodb.client.model.BulkWriteOptions的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
private static Source<Optional<Throwable>, NotUsed> doDeleteByFilter(final MongoCollection<Document> collection,
final Bson filter) {
// https://stackoverflow.com/a/33164008
// claims unordered bulk ops halve MongoDB load
final List<WriteModel<Document>> writeModel =
Collections.singletonList(new DeleteManyModel<>(filter));
final BulkWriteOptions options = new BulkWriteOptions().ordered(false);
return Source.fromPublisher(collection.bulkWrite(writeModel, options))
.map(result -> {
if (LOGGER.isDebugEnabled()) {
// in contrast to Bson, BsonDocument has meaningful toString()
final BsonDocument filterBsonDoc = BsonUtil.toBsonDocument(filter);
LOGGER.debug("Deleted <{}> documents from collection <{}>. Filter was <{}>.",
result.getDeletedCount(), collection.getNamespace(), filterBsonDoc);
}
return Optional.<Throwable>empty();
})
.recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
.matchAny(throwable -> Source.single(Optional.of(throwable)))
.build());
}
private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(
final List<AbstractWriteModel> abstractWriteModels) {
final List<WriteModel<Document>> writeModels = abstractWriteModels.stream()
.map(AbstractWriteModel::toMongo)
.collect(Collectors.toList());
return Source.fromPublisher(collection.bulkWrite(writeModels, new BulkWriteOptions().ordered(false)))
.map(bulkWriteResult -> WriteResultAndErrors.success(abstractWriteModels, bulkWriteResult))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<WriteResultAndErrors, NotUsed>>()
.match(MongoBulkWriteException.class, bulkWriteException ->
Source.single(WriteResultAndErrors.failure(abstractWriteModels, bulkWriteException))
)
.matchAny(error ->
Source.single(WriteResultAndErrors.unexpectedError(abstractWriteModels, error))
)
.build()
);
}
@Override
public long bulkDelete(List<?> ids) {
var watch = new StopWatch();
int size = ids.size();
int deletedRows = 0;
try {
List<DeleteOneModel<T>> models = new ArrayList<>(size);
for (Object id : ids) {
models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
}
BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
deletedRows = result.getDeletedCount();
return deletedRows;
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("mongo", elapsed, 0, deletedRows);
logger.debug("bulkDelete, collection={}, ids={}, size={}, deletedRows={}, elapsed={}", collectionName, ids, size, deletedRows, elapsed);
checkSlowOperation(elapsed);
}
}
public MongoAdminClient importJsonFile(String fileNamePath) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}), observableAdapter);
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}));
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(clientSession.getWrapped(), requests, options, callback);
}
}));
}
@Override
public Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests,
BulkWriteOptions options) {
return Wrappers.toUni(collection.bulkWrite(requests, options));
}
@Override
public Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
List<? extends WriteModel<? extends T>> requests, BulkWriteOptions options) {
return Wrappers.toUni(collection.bulkWrite(clientSession, requests, options));
}
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {
new TestKit(actorSystem) {{
// GIVEN: The persistence fails with an error on every write
final MongoDatabase db = Mockito.mock(MongoDatabase.class);
final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
.thenReturn(publisher);
// GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink
final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);
final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
flow.start(1, 1, Duration.ZERO).to(Sink.ignore());
final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);
// WHEN: Many changes stream through MongoSearchUpdaterFlow
final int numberOfChanges = 25;
final CountDownLatch latch = new CountDownLatch(numberOfChanges);
final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
Source.repeat(Source.single(abstractWriteModel))
.take(numberOfChanges)
.buffer(1, OverflowStrategy.backpressure())
.map(source -> {
latch.countDown();
return source;
})
.runWith(restartSink, ActorMaterializer.create(actorSystem));
// THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream
latch.await(5L, TimeUnit.SECONDS);
assertThat(latch.getCount()).isZero();
}};
}
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a {@link Uni} receiving the {@link BulkWriteResult}
*/
Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests,
BulkWriteOptions options);
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param clientSession the client session with which to associate this operation
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a {@link Uni} receiving the {@link BulkWriteResult}
*/
Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
List<? extends WriteModel<? extends T>> requests,
BulkWriteOptions options);
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return an Observable with a single element the BulkWriteResult
*/
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
*/
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param clientSession the client session with which to associate this operation
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests,
BulkWriteOptions options);