下面列出了怎么用com.mongodb.bulk.BulkWriteResult的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void bulkWrite() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))))
.await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Override
boolean commit() {
final MongoCollection<DocumentT> collection = getCollection();
final List<WriteModel<DocumentT>> writes = getBulkWriteModels();
if (collection == null) {
throw new IllegalStateException("cannot commit a container with no associated collection");
}
boolean success = true;
if (writes.size() > 0) {
final BulkWriteResult result = collection.bulkWrite(writes);
success = result.wasAcknowledged();
}
return success;
}
@Test
public void allSuccess() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(0, 3, 1, 1,
List.of(new BulkWriteUpsert(0, new BsonString("upsert 0")),
new BulkWriteUpsert(4, new BsonString("upsert 4")))
);
// WHEN
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.success(writeModels, result);
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN
actorSystem.log().info(message);
assertThat(message).contains("Acknowledged: Success");
}
@Test
public void partialSuccess() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 3),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 4)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with errors, one of which is not duplicate key
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: the non-duplicate-key error triggers a failure acknowledgement
actorSystem.log().info(message);
for (int i = 3; i < 5; ++i) {
assertThat(expectUpdateThingResponse(writeModels.get(i).getMetadata().getThingId()))
.describedAs("response is failure")
.returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("Acknowledged: PartialSuccess");
}
@Test
public void errorIndexOutOfBoundError() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 0),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 5)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with at least 1 error with out-of-bound index
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: All updates are considered failures
actorSystem.log().info(message);
for (final AbstractWriteModel writeModel : writeModels) {
final UpdateThingResponse response = expectUpdateThingResponse(writeModel.getMetadata().getThingId());
assertThat(response).describedAs("response is failure").returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("ConsistencyError[indexOutOfBound]");
}
@Override
public long bulkDelete(List<?> ids) {
var watch = new StopWatch();
int size = ids.size();
int deletedRows = 0;
try {
List<DeleteOneModel<T>> models = new ArrayList<>(size);
for (Object id : ids) {
models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
}
BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
deletedRows = result.getDeletedCount();
return deletedRows;
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("mongo", elapsed, 0, deletedRows);
logger.debug("bulkDelete, collection={}, ids={}, size={}, deletedRows={}, elapsed={}", collectionName, ids, size, deletedRows, elapsed);
checkSlowOperation(elapsed);
}
}
public static String toMessage(final String context,
final BulkWriteResult result,
final int objectCount) {
final StringBuilder message = new StringBuilder(128);
message.append("processed ").append(objectCount).append(" ").append(context);
if (result.wasAcknowledged()) {
final int updates = result.getMatchedCount();
final int inserts = objectCount - updates;
message.append(" with ").append(inserts).append(" inserts and ");
message.append(updates).append(" updates");
} else {
message.append(" (result NOT acknowledged)");
}
return message.toString();
}
static MongoClientBulkWriteResult toMongoClientBulkWriteResult(BulkWriteResult bulkWriteResult) {
if (!bulkWriteResult.wasAcknowledged()) {
return null;
}
List<JsonObject> upsertResult = bulkWriteResult.getUpserts().stream().map(upsert -> {
JsonObject upsertValue = convertUpsertId(upsert.getId());
upsertValue.put(MongoClientBulkWriteResult.INDEX, upsert.getIndex());
return upsertValue;
}).collect(Collectors.toList());
return new MongoClientBulkWriteResult(
bulkWriteResult.getInsertedCount(),
bulkWriteResult.getMatchedCount(),
bulkWriteResult.getDeletedCount(),
bulkWriteResult.getModifiedCount(),
upsertResult
);
}
/**
* Creates a new instance of the WriteBatchResult from Mongo's raw BulkWriteResult
*
* @param writeResult The BulkWriteResult returned from Mongo
*/
public WriteBatchResult(BulkWriteResult writeResult) {
this.totalUpdates =
writeResult.getInsertedCount()
+ writeResult.getModifiedCount()
+ writeResult.getUpserts().size();
this.totalDeletes = writeResult.getDeletedCount();
this.isReadOnly = false;
}
/**
* Private helper method for writing a collection of edits into the database
*
* @param edits The edits to write
* @return A summary of the write results
*/
private WriteBatchResult doBulkWrite(WriteBatch edits) {
if (this.isReadOnly) {
LOG.info("Skipping writing because database is read only");
return new WriteBatchResult(true);
}
BulkWriteResult writeResult =
this.collection.bulkWrite(this.clientSession, edits.getEdits());
WriteBatchResult result = new WriteBatchResult(writeResult);
if (result.totalDeletes != edits.getDeleteCount()) {
LOG.debug(
"Expected {} deletes but only deleted {}",
edits.getDeleteCount(),
result.totalDeletes);
}
if (result.totalUpdates != edits.getUpdateCount()) {
LOG.debug(
"Expected {} upserts but only got {}",
edits.getUpdateCount(),
result.totalUpdates);
}
LOG.debug("Successfully wrote {} edits", edits.getEdits().size());
return result;
}
private void processSinkRecords(MongoCollection<BsonDocument> collection, List<SinkRecord> batch) {
String collectionName = collection.getNamespace().getCollectionName();
List<? extends WriteModel<BsonDocument>> docsToWrite =
sinkConfig.isUsingCdcHandler(collectionName)
? buildWriteModelCDC(batch,collectionName)
: buildWriteModel(batch,collectionName);
try {
if (!docsToWrite.isEmpty()) {
LOGGER.debug("bulk writing {} document(s) into collection [{}]",
docsToWrite.size(), collection.getNamespace().getFullName());
BulkWriteResult result = collection.bulkWrite(
docsToWrite, BULK_WRITE_OPTIONS);
LOGGER.debug("mongodb bulk write result: " + result.toString());
}
} catch (MongoException mexc) {
if (mexc instanceof BulkWriteException) {
BulkWriteException bwe = (BulkWriteException) mexc;
LOGGER.error("mongodb bulk write (partially) failed", bwe);
LOGGER.error(bwe.getWriteResult().toString());
LOGGER.error(bwe.getWriteErrors().toString());
LOGGER.error(bwe.getWriteConcernError().toString());
} else {
LOGGER.error("error on mongodb operation", mexc);
LOGGER.error("writing {} document(s) into collection [{}] failed -> remaining retries ({})",
docsToWrite.size(), collection.getNamespace().getFullName() ,remainingRetries);
}
if (remainingRetries-- <= 0) {
throw new ConnectException("failed to write mongodb documents"
+ " despite retrying -> GIVING UP! :( :( :(", mexc);
}
LOGGER.debug("deferring retry operation for {}ms", deferRetryMs);
context.timeout(deferRetryMs);
throw new RetriableException(mexc.getMessage(), mexc);
}
}
private WriteResultAndErrors(
final List<AbstractWriteModel> writeModels,
final BulkWriteResult bulkWriteResult,
final List<BulkWriteError> bulkWriteErrors,
@Nullable final Throwable unexpectedError) {
this.writeModels = writeModels;
this.bulkWriteResult = bulkWriteResult;
this.bulkWriteErrors = bulkWriteErrors;
this.unexpectedError = unexpectedError;
}
@Test
public void streamIsRestartableAfterMongoBulkWriteException() throws Exception {
final BulkWriteResult bulkWriteResult = Mockito.mock(BulkWriteResult.class);
final MongoBulkWriteException error = Mockito.mock(MongoBulkWriteException.class);
Mockito.when(error.getWriteResult()).thenReturn(bulkWriteResult);
testStreamRestart(() -> error);
}
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}), observableAdapter);
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}));
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(clientSession.getWrapped(), requests, options, callback);
}
}));
}
@Override
public Future<@Nullable MongoClientBulkWriteResult> bulkWriteWithOptions(String collection, List<BulkOperation> operations, BulkWriteOptions bulkWriteOptions) {
requireNonNull(collection, "collection cannot be null");
requireNonNull(operations, "operations cannot be null");
requireNonNull(bulkWriteOptions, "bulkWriteOptions cannot be null");
MongoCollection<JsonObject> coll = getCollection(collection, bulkWriteOptions.getWriteOption());
List<WriteModel<JsonObject>> bulkOperations = convertBulkOperations(operations);
com.mongodb.client.model.BulkWriteOptions options = new com.mongodb.client.model.BulkWriteOptions().ordered(bulkWriteOptions.isOrdered());
Promise<BulkWriteResult> promise = vertx.promise();
coll.bulkWrite(bulkOperations, options).subscribe(new SingleResultSubscriber<>(promise));
return promise.future().map(Utils::toMongoClientBulkWriteResult);
}
/**
* Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
* with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
* object to BsonDocument to extract {@code _id} attribute.
*/
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
ReplaceOptions options = new ReplaceOptions();
if (operation.upsert()) {
options.upsert(operation.upsert());
}
List<ReplaceOneModel<Object>> docs = operation.values().stream()
.map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
.collect(Collectors.toList());
Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}
public BulkWriteResult bulkWrite(String collName, List<? extends WriteModel<? extends Document>> modelList) {
MongoCollection<Document> collection = getCollection(collName);
return collection.bulkWrite(modelList);
}
public BulkWriteResult bulkWrite(String collName, List<? extends WriteModel<? extends Document>> modelList, BulkWriteOptions options) {
MongoCollection<Document> collection = getCollection(collName);
return collection.bulkWrite(modelList, options);
}
@Override
public Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests) {
return Wrappers.toUni(collection.bulkWrite(requests));
}
@Override
public Uni<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends T>> requests,
BulkWriteOptions options) {
return Wrappers.toUni(collection.bulkWrite(requests, options));
}
@Override
public Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
List<? extends WriteModel<? extends T>> requests) {
return Wrappers.toUni(collection.bulkWrite(clientSession, requests));
}
@Override
public Uni<BulkWriteResult> bulkWrite(ClientSession clientSession,
List<? extends WriteModel<? extends T>> requests, BulkWriteOptions options) {
return Wrappers.toUni(collection.bulkWrite(clientSession, requests, options));
}
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {
new TestKit(actorSystem) {{
// GIVEN: The persistence fails with an error on every write
final MongoDatabase db = Mockito.mock(MongoDatabase.class);
final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
.thenReturn(publisher);
// GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink
final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);
final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
flow.start(1, 1, Duration.ZERO).to(Sink.ignore());
final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);
// WHEN: Many changes stream through MongoSearchUpdaterFlow
final int numberOfChanges = 25;
final CountDownLatch latch = new CountDownLatch(numberOfChanges);
final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
Source.repeat(Source.single(abstractWriteModel))
.take(numberOfChanges)
.buffer(1, OverflowStrategy.backpressure())
.map(source -> {
latch.countDown();
return source;
})
.runWith(restartSink, ActorMaterializer.create(actorSystem));
// THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream
latch.await(5L, TimeUnit.SECONDS);
assertThat(latch.getCount()).isZero();
}};
}
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
public void saveMatches(final MatchCollectionId collectionId,
final List<CanvasMatches> matchesList)
throws IllegalArgumentException {
MongoUtil.validateRequiredParameter("matchesList", matchesList);
LOG.debug("saveMatches: entry, collectionId={}, matchesList.size()={}",
collectionId, matchesList.size());
if (matchesList.size() > 0) {
final MongoCollection<Document> collection =
matchDatabase.getCollection(collectionId.getDbCollectionName());
ensureMatchIndexes(collection);
final List<WriteModel<Document>> modelList = new ArrayList<>(matchesList.size());
final UpdateOptions upsertOption = new UpdateOptions().upsert(true);
Document filter;
Document matchesObject;
for (final CanvasMatches canvasMatches : matchesList) {
canvasMatches.normalize();
filter = new Document(
"pGroupId", canvasMatches.getpGroupId()).append(
"pId", canvasMatches.getpId()).append(
"qGroupId", canvasMatches.getqGroupId()).append(
"qId", canvasMatches.getqId());
matchesObject = Document.parse(canvasMatches.toJson());
modelList.add(new ReplaceOneModel<>(filter, matchesObject, upsertOption));
}
final BulkWriteResult result = collection.bulkWrite(modelList, MongoUtil.UNORDERED_OPTION);
if (LOG.isDebugEnabled()) {
final String bulkResultMessage = MongoUtil.toMessage("matches", result, matchesList.size());
LOG.debug("saveMatches: {} using {}.initializeUnorderedBulkOp()",
bulkResultMessage, MongoUtil.fullName(collection));
}
}
}