下面列出了怎么用com.mongodb.bulk.BulkWriteError的API类实例代码及写法,或者点击链接到github查看源代码。
private Iterable<String> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors) {
if (wasNotAcknowledged(writeResultAndErrors)) {
// All failed.
acknowledgeFailures(getAllThings(writeResultAndErrors));
return Collections.singleton(logResult("NotAcknowledged", writeResultAndErrors, false));
} else {
final Optional<String> consistencyError = checkForConsistencyError(writeResultAndErrors);
if (consistencyError.isPresent()) {
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllThings(writeResultAndErrors));
return Collections.singleton(consistencyError.get());
} else {
final List<BulkWriteError> errors = writeResultAndErrors.getBulkWriteErrors();
final List<String> logEntries = new ArrayList<>(errors.size() + 1);
final List<Metadata> failedThings = new ArrayList<>(errors.size());
logEntries.add(logResult("Acknowledged", writeResultAndErrors, errors.isEmpty()));
for (final BulkWriteError error : errors) {
final Metadata metadata = writeResultAndErrors.getWriteModels().get(error.getIndex()).getMetadata();
logEntries.add(String.format("UpdateFailed for %s due to %s", metadata, error));
failedThings.add(metadata);
}
acknowledgeFailures(failedThings);
return logEntries;
}
}
}
@Test
public void partialSuccess() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 3),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 4)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with errors, one of which is not duplicate key
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: the non-duplicate-key error triggers a failure acknowledgement
actorSystem.log().info(message);
for (int i = 3; i < 5; ++i) {
assertThat(expectUpdateThingResponse(writeModels.get(i).getMetadata().getThingId()))
.describedAs("response is failure")
.returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("Acknowledged: PartialSuccess");
}
@Test
public void errorIndexOutOfBoundError() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 0),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 5)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with at least 1 error with out-of-bound index
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: All updates are considered failures
actorSystem.log().info(message);
for (final AbstractWriteModel writeModel : writeModels) {
final UpdateThingResponse response = expectUpdateThingResponse(writeModel.getMetadata().getThingId());
assertThat(response).describedAs("response is failure").returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("ConsistencyError[indexOutOfBound]");
}
private WriteResultAndErrors(
final List<AbstractWriteModel> writeModels,
final BulkWriteResult bulkWriteResult,
final List<BulkWriteError> bulkWriteErrors,
@Nullable final Throwable unexpectedError) {
this.writeModels = writeModels;
this.bulkWriteResult = bulkWriteResult;
this.bulkWriteErrors = bulkWriteErrors;
this.unexpectedError = unexpectedError;
}
/**
* MongoBulkWriteException contains error messages that inform
* which documents were duplicated. This method catches those ID and print them.
* @param e
*/
private void printDuplicatedException(MongoBulkWriteException e) {
List<BulkWriteError> errors = e.getWriteErrors();
for (BulkWriteError error : errors) {
String msg = error.getMessage();
Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID
Matcher matcher = pattern.matcher(msg);
if (matcher.find()) { // if there were a note ID
String noteId = matcher.group();
LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB");
}
}
}
/**
* Ensures current exception has been generated due to a duplicate (primary) key.
* Differentiates between Fongo and Mongo exceptions since the behaviour under these databases
* is different.
*/
public static void assertDuplicateKeyException(Throwable exception) {
Preconditions.checkNotNull(exception, "exception");
// unwrap, if necessary
exception = exception instanceof MongoException ? exception : exception.getCause();
// fongo throws directly DuplicateKeyException
if (exception instanceof DuplicateKeyException) return;
// MongoDB throws custom exception
if (exception instanceof MongoCommandException) {
String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue();
int errorCode = ((MongoCommandException) exception).getErrorCode();
check(codeName).is("DuplicateKey");
check(errorCode).is(11000); // code 11000 stands for DuplicateKeyException
// all good here (can return)
return;
}
// for bulk writes as well
if (exception instanceof MongoBulkWriteException) {
List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors();
check(errors).hasSize(1);
check(errors.get(0).getCode()).is(11000);
check(errors.get(0).getMessage()).contains("duplicate key");
return;
}
// if we got here means there is a problem (no duplicate key exception)
fail("Should get duplicate key exception after " + exception);
}
private static boolean areAllIndexesWithinBounds(final List<BulkWriteError> bulkWriteErrors, final int requested) {
return bulkWriteErrors.stream().mapToInt(BulkWriteError::getIndex).allMatch(i -> 0 <= i && i < requested);
}
/**
* Retrieve the bulk write errors.
*
* @return the bulk write errors.
*/
public List<BulkWriteError> getBulkWriteErrors() {
return bulkWriteErrors;
}