下面列出了怎么用com.mongodb.MongoCommandException的API类实例代码及写法,或者点击链接到github查看源代码。
private static Source<Success, NotUsed> repeatableCreateCappedCollectionSource(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes) {
final CreateCollectionOptions collectionOptions = new CreateCollectionOptions()
.capped(true)
.sizeInBytes(cappedCollectionSizeInBytes)
.maxDocuments(1);
return Source.lazily(
() -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
.mapMaterializedValue(whatever -> NotUsed.getInstance())
.withAttributes(Attributes.inputBuffer(1, 1))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class,
MongoTimestampPersistence::isCollectionAlreadyExistsError,
error -> Source.single(Success.SUCCESS))
.build());
}
@Test
public void initializeFailsWhenConflictingIndexWithSameNameAlreadyExists() {
// GIVEN
final String collectionName = "conflictingIndexWithSameName";
final List<Index> indices = Collections.singletonList(INDEX_FOO);
initialize(collectionName, indices);
assertIndices(collectionName, indices);
// WHEN / THEN
final List<Index> newIndices = Arrays.asList(INDEX_BAR,
INDEX_FOO_CONFLICTING_NAME_OPTION, INDEX_BAZ);
assertThatExceptionOfType(MongoCommandException.class).isThrownBy(() -> initialize(collectionName, newIndices))
.satisfies(e -> assertThat(e.getErrorCode()).isEqualTo(MONGO_INDEX_OPTIONS_CONFLICT_ERROR_CODE));
// verify that bar has been created nevertheless (cause it has been initialized before the error), in
// contrast to baz
assertIndices(collectionName, Arrays.asList(INDEX_BAR, INDEX_FOO));
}
void enableValidation(final MappedClass mc, final Validation validation) {
if (validation != null) {
String collectionName = mc.getCollectionName();
try {
getDatabase().runCommand(new Document("collMod", collectionName)
.append("validator", parse(validation.value()))
.append("validationLevel", validation.level().getValue())
.append("validationAction", validation.action().getValue()));
} catch (MongoCommandException e) {
if (e.getCode() == 26) {
getDatabase().createCollection(collectionName,
new CreateCollectionOptions()
.validationOptions(new ValidationOptions()
.validator(parse(validation.value()))
.validationLevel(validation.level())
.validationAction(validation.action())));
} else {
throw e;
}
}
}
}
@Test
public void findAndModify() {
getMapper().map(DocumentValidation.class);
getDs().enableDocumentValidation();
getDs().save(new DocumentValidation("Harold", 100, new Date()));
Query<DocumentValidation> query = getDs().find(DocumentValidation.class);
ModifyOptions options = new ModifyOptions()
.bypassDocumentValidation(false);
Modify<DocumentValidation> modify = query.modify(set("number", 5));
try {
modify.execute(options);
fail("Document validation should have complained");
} catch (MongoCommandException e) {
// expected
}
options.bypassDocumentValidation(true);
modify.execute(options);
Assert.assertNotNull(query.filter(eq("number", 5)).iterator(new FindOptions().limit(1))
.next());
}
private MongoCursor<BsonDocument> tryCreateCursor(
final MongoSourceConfig sourceConfig,
final MongoClient mongoClient,
final BsonDocument resumeToken) {
try {
ChangeStreamIterable<Document> changeStreamIterable =
getChangeStreamIterable(sourceConfig, mongoClient);
if (resumeToken != null && supportsStartAfter) {
LOGGER.info("Resuming the change stream after the previous offset: {}", resumeToken);
changeStreamIterable.startAfter(resumeToken);
} else if (resumeToken != null && !invalidatedCursor) {
LOGGER.info("Resuming the change stream after the previous offset using resumeAfter.");
changeStreamIterable.resumeAfter(resumeToken);
} else {
LOGGER.info("New change stream cursor created without offset.");
}
return changeStreamIterable.withDocumentClass(BsonDocument.class).iterator();
} catch (MongoCommandException e) {
if (resumeToken != null) {
if (e.getErrorCode() == 260) {
invalidatedCursor = true;
return tryCreateCursor(sourceConfig, mongoClient, null);
} else if ((e.getErrorCode() == 9 || e.getErrorCode() == 40415)
&& e.getErrorMessage().contains("startAfter")) {
supportsStartAfter = false;
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
}
}
LOGGER.info("Failed to resume change stream: {} {}", e.getErrorMessage(), e.getErrorCode());
return null;
}
}
private static PartialFunction<Throwable, Source<Success, NotUsed>> buildDropIndexRecovery(
final String indexDescription) {
return new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class, IndexOperations::isIndexNotFound, throwable -> {
LOGGER.debug("Index <{}> could not be dropped because it does not exist (anymore).",
indexDescription);
return Source.single(Success.SUCCESS);
})
.build();
}
public static MongoCollection<Document> initializeCollection(final MongoNamespace namespace) throws Throwable {
MongoDatabase database = getMongoClient().getDatabase(namespace.getDatabaseName());
try {
database.runCommand(new Document("drop", namespace.getCollectionName())).timeout(10, SECONDS).toBlocking().first();
} catch (MongoCommandException e) {
if (!e.getErrorMessage().startsWith("ns not found")) {
throw e;
}
}
return database.getCollection(namespace.getCollectionName());
}
public static void dropDatabase(final String name) throws Throwable {
if (name == null) {
return;
}
try {
getMongoClient().getDatabase(name).runCommand(new Document("dropDatabase", 1)).timeout(10, SECONDS).toBlocking().first();
} catch (MongoCommandException e) {
if (!e.getErrorMessage().startsWith("ns not found")) {
throw e;
}
}
}
public static void drop(final MongoNamespace namespace) throws Throwable {
try {
getMongoClient().getDatabase(namespace.getDatabaseName())
.runCommand(new Document("drop", namespace.getCollectionName())).timeout(10, SECONDS).toBlocking().first();
} catch (MongoCommandException e) {
if (!e.getErrorMessage().contains("ns not found")) {
throw e;
}
}
}
public static MongoCollection<Document> initializeCollection(final MongoNamespace namespace) throws Throwable {
MongoDatabase database = getMongoClient().getDatabase(namespace.getDatabaseName());
try {
ObservableSubscriber<Document> subscriber = new ObservableSubscriber<Document>();
database.runCommand(new Document("drop", namespace.getCollectionName())).subscribe(subscriber);
subscriber.await(10, SECONDS);
} catch (MongoCommandException e) {
if (!e.getErrorMessage().startsWith("ns not found")) {
throw e;
}
}
return database.getCollection(namespace.getCollectionName());
}
public static void dropDatabase(final String name) throws Throwable {
if (name == null) {
return;
}
try {
ObservableSubscriber<Document> subscriber = new ObservableSubscriber<Document>();
getMongoClient().getDatabase(name).runCommand(new Document("dropDatabase", 1)).subscribe(subscriber);
subscriber.await(10, SECONDS);
} catch (MongoCommandException e) {
if (!e.getErrorMessage().startsWith("ns not found")) {
throw e;
}
}
}
public static void drop(final MongoNamespace namespace) throws Throwable {
try {
ObservableSubscriber<Document> subscriber = new ObservableSubscriber<Document>();
getMongoClient().getDatabase(namespace.getDatabaseName())
.runCommand(new Document("drop", namespace.getCollectionName()))
.subscribe(subscriber);
subscriber.await(20, SECONDS);
} catch (MongoCommandException e) {
if (!e.getErrorMessage().contains("ns not found")) {
throw e;
}
}
}
/**
* Ensures current exception has been generated due to a duplicate (primary) key.
* Differentiates between Fongo and Mongo exceptions since the behaviour under these databases
* is different.
*/
public static void assertDuplicateKeyException(Throwable exception) {
Preconditions.checkNotNull(exception, "exception");
// unwrap, if necessary
exception = exception instanceof MongoException ? exception : exception.getCause();
// fongo throws directly DuplicateKeyException
if (exception instanceof DuplicateKeyException) return;
// MongoDB throws custom exception
if (exception instanceof MongoCommandException) {
String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue();
int errorCode = ((MongoCommandException) exception).getErrorCode();
check(codeName).is("DuplicateKey");
check(errorCode).is(11000); // code 11000 stands for DuplicateKeyException
// all good here (can return)
return;
}
// for bulk writes as well
if (exception instanceof MongoBulkWriteException) {
List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors();
check(errors).hasSize(1);
check(errors.get(0).getCode()).is(11000);
check(errors.get(0).getMessage()).contains("duplicate key");
return;
}
// if we got here means there is a problem (no duplicate key exception)
fail("Should get duplicate key exception after " + exception);
}
@Test(expected = MongoCommandException.class)
public void shouldNotAllowMultipleTextIndexes() {
Class<MultipleTextIndexes> clazz = MultipleTextIndexes.class;
getMapper().map(clazz);
getMapper().getCollection(clazz).drop();
getDs().ensureIndexes();
}
@Test public void defaultSpanNameIsCommandName_nonCollectionCommand() {
try {
database.runCommand(new BsonDocument("dropUser", new BsonString("testUser")));
// Expected, we are trying to drop a user that doesn't exist
failBecauseExceptionWasNotThrown(MongoCommandException.class);
} catch (MongoCommandException e) {
MutableSpan span = testSpanHandler.takeRemoteSpanWithError(CLIENT, e);
// "testUser" should not be mistaken as a collection name
assertThat(span.name()).isEqualTo("dropUser");
}
}
private static boolean isIndexNotFound(final MongoCommandException e) {
return e.getErrorCode() == 27;
}
private static boolean isCollectionAlreadyExistsError(final MongoCommandException error) {
return error.getErrorCode() == COLLECTION_ALREADY_EXISTS_ERROR_CODE;
}