下面列出了怎么用com.mongodb.MongoBulkWriteException的API类实例代码及写法,或者点击链接到github查看源代码。
private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(
final List<AbstractWriteModel> abstractWriteModels) {
final List<WriteModel<Document>> writeModels = abstractWriteModels.stream()
.map(AbstractWriteModel::toMongo)
.collect(Collectors.toList());
return Source.fromPublisher(collection.bulkWrite(writeModels, new BulkWriteOptions().ordered(false)))
.map(bulkWriteResult -> WriteResultAndErrors.success(abstractWriteModels, bulkWriteResult))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<WriteResultAndErrors, NotUsed>>()
.match(MongoBulkWriteException.class, bulkWriteException ->
Source.single(WriteResultAndErrors.failure(abstractWriteModels, bulkWriteException))
)
.matchAny(error ->
Source.single(WriteResultAndErrors.unexpectedError(abstractWriteModels, error))
)
.build()
);
}
@Test
public void partialSuccess() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 3),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 4)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with errors, one of which is not duplicate key
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: the non-duplicate-key error triggers a failure acknowledgement
actorSystem.log().info(message);
for (int i = 3; i < 5; ++i) {
assertThat(expectUpdateThingResponse(writeModels.get(i).getMetadata().getThingId()))
.describedAs("response is failure")
.returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("Acknowledged: PartialSuccess");
}
@Test
public void errorIndexOutOfBoundError() {
final List<AbstractWriteModel> writeModels = generate5WriteModels();
final BulkWriteResult result = BulkWriteResult.acknowledged(1, 2, 1, 2, List.of());
final List<BulkWriteError> updateFailure = List.of(
new BulkWriteError(11000, "E11000 duplicate key error", new BsonDocument(), 0),
new BulkWriteError(50, "E50 operation timed out", new BsonDocument(), 5)
);
// WHEN: BulkWriteResultAckFlow receives partial update success with at least 1 error with out-of-bound index
final WriteResultAndErrors resultAndErrors = WriteResultAndErrors.failure(writeModels,
new MongoBulkWriteException(result, updateFailure, null, new ServerAddress()));
final String message = runBulkWriteResultAckFlowAndGetFirstLogEntry(resultAndErrors);
// THEN: All updates are considered failures
actorSystem.log().info(message);
for (final AbstractWriteModel writeModel : writeModels) {
final UpdateThingResponse response = expectUpdateThingResponse(writeModel.getMetadata().getThingId());
assertThat(response).describedAs("response is failure").returns(false, UpdateThingResponse::isSuccess);
}
assertThat(message).contains("ConsistencyError[indexOutOfBound]");
}
private void flush() {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection());
try {
mongoCollection.insertMany(batch, new InsertManyOptions().ordered(spec.ordered()));
} catch (MongoBulkWriteException e) {
if (spec.ordered()) {
throw e;
}
}
batch.clear();
}
@Override
public void onError(Throwable t) {
if (t != null) {
log.error("MongoDB insertion error", t);
if (t instanceof MongoBulkWriteException) {
// With this exception, we are aware of the items that have not been inserted.
((MongoBulkWriteException) t).getWriteErrors().forEach(err -> {
idxToFail.add(err.getIndex());
});
idxToAck.removeAll(idxToFail);
} else {
idxToFail.addAll(idxToAck);
idxToAck.clear();
}
}
this.onComplete();
}
/**
* If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true,
* this method will insert local notes into MongoDB on startup.
* If a note already exists in MongoDB, skip it.
*/
private void insertFileSystemNotes() throws IOException {
LinkedList<Document> docs = new LinkedList<>(); // docs to be imported
OldNotebookRepo vfsRepo = new OldVFSNotebookRepo();
vfsRepo.init(this.conf);
List<OldNoteInfo> infos = vfsRepo.list(null);
// collect notes to be imported
for (OldNoteInfo info : infos) {
Note note = vfsRepo.get(info.getId(), null);
Document doc = noteToDocument(note);
docs.add(doc);
}
/*
* 'ordered(false)' option allows to proceed bulk inserting even though
* there are duplicated documents. The duplicated documents will be skipped
* and print a WARN log.
*/
try {
coll.insertMany(docs, new InsertManyOptions().ordered(false));
} catch (MongoBulkWriteException e) {
printDuplicatedException(e); //print duplicated document warning log
}
vfsRepo.close(); // it does nothing for now but maybe in the future...
}
/**
* Insert bulk edges
*
* @see: outV|label|inV is unique in ChronoGraph. this operation does not permit
* duplicated edges
* @param edges use with Converter.getBsonDocumentEdge
*/
public void addEdges(List<BsonDocument> edgeArray) {
while (true) {
try {
edges.insertMany(edgeArray);
return;
} catch (MongoBulkWriteException e) {
if (e.getCode() == -3) {
int cnt = e.getWriteResult().getInsertedCount();
edgeArray = edgeArray.subList(cnt + 1, edgeArray.size());
} else
throw e;
}
}
}
@Test
public void streamIsRestartableAfterMongoBulkWriteException() throws Exception {
final BulkWriteResult bulkWriteResult = Mockito.mock(BulkWriteResult.class);
final MongoBulkWriteException error = Mockito.mock(MongoBulkWriteException.class);
Mockito.when(error.getWriteResult()).thenReturn(bulkWriteResult);
testStreamRestart(() -> error);
}
/**
* MongoBulkWriteException contains error messages that inform
* which documents were duplicated. This method catches those ID and print them.
* @param e
*/
private void printDuplicatedException(MongoBulkWriteException e) {
List<BulkWriteError> errors = e.getWriteErrors();
for (BulkWriteError error : errors) {
String msg = error.getMessage();
Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID
Matcher matcher = pattern.matcher(msg);
if (matcher.find()) { // if there were a note ID
String noteId = matcher.group();
LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB");
}
}
}
public HashMap<String, Object> capture(List<BsonDocument> bsonDocumentList) {
HashMap<String, Object> retMsg = new HashMap<String, Object>();
MongoCollection<BsonDocument> collection = Configuration.mongoDatabase.getCollection("EventData",
BsonDocument.class);
try {
InsertManyOptions option = new InsertManyOptions();
option.ordered(false);
collection.insertMany(bsonDocumentList, option);
} catch (MongoBulkWriteException e) {
retMsg.put("error", e.getMessage());
return retMsg;
}
retMsg.put("eventCaptured", bsonDocumentList.size());
return retMsg;
}
public HashMap<String, Object> capture(List<BsonDocument> bsonDocumentList) {
HashMap<String, Object> retMsg = new HashMap<String, Object>();
MongoCollection<BsonDocument> collection = Configuration.mongoDatabase.getCollection("EventData",
BsonDocument.class);
try {
InsertManyOptions option = new InsertManyOptions();
option.ordered(false);
collection.insertMany(bsonDocumentList, option);
} catch (MongoBulkWriteException e) {
retMsg.put("error", e.getMessage());
return retMsg;
}
retMsg.put("eventCaptured", bsonDocumentList.size());
return retMsg;
}
/**
* Ensures current exception has been generated due to a duplicate (primary) key.
* Differentiates between Fongo and Mongo exceptions since the behaviour under these databases
* is different.
*/
public static void assertDuplicateKeyException(Throwable exception) {
Preconditions.checkNotNull(exception, "exception");
// unwrap, if necessary
exception = exception instanceof MongoException ? exception : exception.getCause();
// fongo throws directly DuplicateKeyException
if (exception instanceof DuplicateKeyException) return;
// MongoDB throws custom exception
if (exception instanceof MongoCommandException) {
String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue();
int errorCode = ((MongoCommandException) exception).getErrorCode();
check(codeName).is("DuplicateKey");
check(errorCode).is(11000); // code 11000 stands for DuplicateKeyException
// all good here (can return)
return;
}
// for bulk writes as well
if (exception instanceof MongoBulkWriteException) {
List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors();
check(errors).hasSize(1);
check(errors.get(0).getCode()).is(11000);
check(errors.get(0).getMessage()).contains("duplicate key");
return;
}
// if we got here means there is a problem (no duplicate key exception)
fail("Should get duplicate key exception after " + exception);
}
/**
* Create a WriteResultAndErrors from a MongoBulkWriteException.
*
* @param writeModels the requested write models.
* @param mongoBulkWriteException the exception.
* @return the write result with errors.
*/
public static WriteResultAndErrors failure(final List<AbstractWriteModel> writeModels,
final MongoBulkWriteException mongoBulkWriteException) {
return new WriteResultAndErrors(writeModels, mongoBulkWriteException.getWriteResult(),
mongoBulkWriteException.getWriteErrors(), null);
}