下面列出了怎么用com.mongodb.ReadConcern的API类实例代码及写法,或者点击链接到github查看源代码。
@Parameters(method = "validReadConcernValues")
@Test
public void should_parse_read_concern_from_connection_string(String readConcernString, ReadConcern expectedReadConcern) throws Exception {
// given
final JsonObject configWithConnectionString = new JsonObject().put(
"connection_string",
String.format("mongodb://localhost:27017/mydb?replicaSet=myRs&readconcernlevel=%s", readConcernString)
);
// when
final ReadConcern parsedReadConcern = new MongoClientOptionsParser(vertx, configWithConnectionString)
.settings()
.getReadConcern();
// then
assertEquals(expectedReadConcern, parsedReadConcern);
}
@Parameters(method = "validReadConcernValues")
@Test
public void should_fallback_to_config_property_if_read_concern_not_present_in_connection_string(String readConcernString, ReadConcern expectedReadConcern) throws Exception {
// given
final JsonObject configWithReadConcernAsSeparateProperty = new JsonObject()
.put("connection_string", "mongodb://localhost:27017/mydb?replicaSet=myRs")
.put("readConcernLevel", readConcernString);
// when
final ReadConcern parsedReadConcern = new MongoClientOptionsParser(vertx, configWithReadConcernAsSeparateProperty)
.settings()
.getReadConcern();
// then
assertEquals(expectedReadConcern, parsedReadConcern);
}
@Test
public void should_return_default_read_concern_in_case_of_missing_read_concern_in_connection_string_or_config_object() throws Exception {
// given
final JsonObject configWithConnectionString = new JsonObject().put(
"connection_string",
"mongodb://localhost:27017/mydb?replicaSet=myRs"
);
// when
final ReadConcern parsedReadConcern = new MongoClientOptionsParser(vertx, configWithConnectionString)
.settings()
.getReadConcern();
// then
assertEquals(ReadConcern.DEFAULT, parsedReadConcern);
}
@Override
public boolean open() {
if (isOpen()) {
return true;
}
LOG.info("Initializing MongoDB at {}", mongoClientUri);
// Get the client and create a session for this instance
MongoClient mongoClient =
MongoConnectionManager.inst().getMongoClientInstance(this.mongoClientUri);
ClientSessionOptions sessionOptions =
ClientSessionOptions.builder()
.causallyConsistent(true)
.defaultTransactionOptions(
TransactionOptions.builder()
.readConcern(ReadConcern.DEFAULT)
.writeConcern(WriteConcern.MAJORITY)
.readPreference(ReadPreference.nearest())
.build())
.build();
this.clientSession = mongoClient.startSession(sessionOptions);
// Get the database and our collection. Mongo takes care of creating these if they don't
// exist
MongoDatabase mongoDb = mongoClient.getDatabase(MongoConstants.AION_DB_NAME);
// Gets the collection where we will be saving our values. Mongo creates it if it doesn't
// yet exist
this.collection = mongoDb.getCollection(this.name, BsonDocument.class);
LOG.info("Finished opening the Mongo connection");
return isOpen();
}
private ReadConcern getReadConcern(String readConcern) {
if ("LOCAL".equalsIgnoreCase(readConcern)) {
return ReadConcern.LOCAL;
}
if ("MAJORITY".equalsIgnoreCase(readConcern)) {
return ReadConcern.MAJORITY;
}
return ReadConcern.DEFAULT;
}
@Test
public void should_prefer_read_concern_passed_via_connection_string_over_property_value() throws Exception {
// given
final JsonObject configWithReadConcernPassedTwice = new JsonObject()
.put("connection_string", "mongodb://localhost:27017/mydb?replicaSet=myRs&readconcernlevel=majority")
.put("readConcernLevel", "linearizable");
// when
final ReadConcern parsedReadConcern = new MongoClientOptionsParser(vertx, configWithReadConcernPassedTwice)
.settings()
.getReadConcern();
// then
assertEquals(ReadConcern.MAJORITY, parsedReadConcern);
}
private Object[] validReadConcernValues() {
return new Object[]{
new Object[]{"local", ReadConcern.LOCAL},
new Object[]{"majority", ReadConcern.MAJORITY},
new Object[]{"linearizable", ReadConcern.LINEARIZABLE}
};
}
@Test
public void testPlanCacheStats() {
checkMinServerVersion(4.2);
List<Document> list = List.of(
parse("{ '_id' : 1, 'item' : 'abc', 'price' : NumberDecimal('12'), 'quantity' : 2, 'type': 'apparel' }"),
parse("{ '_id' : 2, 'item' : 'jkl', 'price' : NumberDecimal('20'), 'quantity' : 1, 'type': 'electronics' }"),
parse("{ '_id' : 3, 'item' : 'abc', 'price' : NumberDecimal('10'), 'quantity' : 5, 'type': 'apparel' }"),
parse("{ '_id' : 4, 'item' : 'abc', 'price' : NumberDecimal('8'), 'quantity' : 10, 'type': 'apparel' }"),
parse("{ '_id' : 5, 'item' : 'jkl', 'price' : NumberDecimal('15'), 'quantity' : 15, 'type': 'electronics' }"));
MongoCollection<Document> orders = getDatabase().getCollection("orders");
orders.insertMany(list);
Assert.assertNotNull(orders.createIndex(new Document("item", 1)));
Assert.assertNotNull(orders.createIndex(new Document("item", 1)
.append("quantity", 1)));
Assert.assertNotNull(orders.createIndex(new Document("item", 1)
.append("price", 1),
new IndexOptions()
.partialFilterExpression(new Document("price", new Document("$gte", 10)))));
Assert.assertNotNull(orders.createIndex(new Document("quantity", 1)));
Assert.assertNotNull(orders.createIndex(new Document("quantity", 1)
.append("type", 1)));
orders.find(parse(" { item: 'abc', price: { $gte: NumberDecimal('10') } }"));
orders.find(parse(" { item: 'abc', price: { $gte: NumberDecimal('5') } }"));
orders.find(parse(" { quantity: { $gte: 20 } } "));
orders.find(parse(" { quantity: { $gte: 5 }, type: 'apparel' } "));
List<Document> stats = getDs().aggregate(Order.class)
.planCacheStats()
.execute(Document.class, new AggregationOptions()
.readConcern(ReadConcern.LOCAL))
.toList();
Assert.assertNotNull(stats);
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public MongoCollection<TDocument> withReadConcern(final ReadConcern readConcern) {
return new MongoCollectionImpl<TDocument>(wrapped.withReadConcern(readConcern), observableAdapter);
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public MongoDatabase withReadConcern(final ReadConcern readConcern) {
return new MongoDatabaseImpl(wrapped.withReadConcern(readConcern), observableAdapter);
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public GridFSBucket withReadConcern(final ReadConcern readConcern) {
return new GridFSBucketImpl(wrapped.withReadConcern(readConcern), observableAdapter);
}
/**
* Taken from <a href="https://docs.mongodb.com/manual/core/transactions/">https://docs.mongodb.com</a>
*/
@Test
public void shouldExecuteTransactions() {
try (
// creatingMongoDBContainer {
final MongoDBContainer mongoDBContainer = new MongoDBContainer()
// }
) {
// startingMongoDBContainer {
mongoDBContainer.start();
// }
final String mongoRsUrl = mongoDBContainer.getReplicaSetUrl();
assertNotNull(mongoRsUrl);
final MongoClient mongoSyncClient = MongoClients.create(mongoRsUrl);
mongoSyncClient.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
mongoSyncClient.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
final ClientSession clientSession = mongoSyncClient.startSession();
final TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
final String trxResult = "Inserted into collections in different databases";
TransactionBody<String> txnBody = () -> {
final MongoCollection<Document> coll1 =
mongoSyncClient.getDatabase("mydb1").getCollection("foo");
final MongoCollection<Document> coll2 =
mongoSyncClient.getDatabase("mydb2").getCollection("bar");
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return trxResult;
};
try {
final String trxResultActual = clientSession.withTransaction(txnBody, txnOptions);
assertEquals(trxResult, trxResultActual);
} catch (RuntimeException re) {
throw new IllegalStateException(re.getMessage(), re);
} finally {
clientSession.close();
mongoSyncClient.close();
}
}
}
private Method getMethod() throws Exception {
return OperationExecutor.class.getMethod("execute", WriteOperation.class, ReadConcern.class);
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public MongoCollection<TDocument> withReadConcern(final ReadConcern readConcern) {
return new MongoCollectionImpl<TDocument>(wrapped.withReadConcern(readConcern));
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public MongoDatabase withReadConcern(final ReadConcern readConcern) {
return new MongoDatabaseImpl(wrapped.withReadConcern(readConcern));
}
@Override
public ReadConcern getReadConcern() {
return wrapped.getReadConcern();
}
@Override
public GridFSBucket withReadConcern(final ReadConcern readConcern) {
return new GridFSBucketImpl(wrapped.withReadConcern(readConcern));
}
Optional<ReadConcern> readConcern() {
return tryToParseFromConnectionString().map(this::lift).orElseGet(this::tryToParseFromConfig);
}
private Optional<ReadConcern> lift(ReadConcern readConcern) {
return Optional.ofNullable(readConcern);
}
private Optional<ReadConcern> tryToParseFromConnectionString() {
return Optional.ofNullable(connectionString)
.flatMap(cs -> Optional.ofNullable(cs.getReadConcern()));
}
private Optional<ReadConcern> tryToParseFromConfig() {
return Optional.ofNullable(config)
.flatMap(cfg -> Optional.ofNullable(cfg.getString("readConcernLevel")))
.map(ReadConcernLevel::fromString)
.map(ReadConcern::new);
}
@Override
public ReadConcern getReadConcern() {
return readConcern;
}
/**
* @return the configuration value
*/
public ReadConcern getReadConcern() {
return readConcern;
}
/**
* @return the configuration value
*/
public ReadConcern readConcern() {
return readConcern;
}
@Test
public void aggregationOptions() {
scan(com.mongodb.AggregationOptions.class, AggregationOptions.class, false, List.of(ReadConcern.class, ReadPreference.class,
WriteConcern.class));
}