下面列出了怎么用com.mongodb.ReadPreference的API类实例代码及写法,或者点击链接到github查看源代码。
public static ReadPreferenceChoice getReadPreferenceChoice( ReadPreference readPref )
{
if( readPref == null )
return PRIMARY; // default
String readPrefName = readPref.getName();
if( readPrefName == ReadPreference.primary().getName() )
return PRIMARY;
if( readPrefName == ReadPreference.primaryPreferred().getName() )
return PRIMARY_PREFERRED;
if( readPrefName == ReadPreference.secondary().getName() )
return SECONDARY;
if( readPrefName == ReadPreference.secondaryPreferred().getName() )
return SECONDARY_PREFERRED;
if( readPrefName == ReadPreference.nearest().getName() )
return NEAREST;
return PRIMARY; // default
}
private static MongoClient getClient(Boolean useCredentials, String replicaSet) {
MongoClientSettings.Builder settings = MongoClientSettings.builder();
if (useCredentials) {
MongoCredential credentials = MongoCredential.createCredential(
USER, ADMIN_DB, PASSWORD.toCharArray());
settings.credential(credentials);
}
StringBuilder connectionString = new StringBuilder(String.format("mongodb://%s:%d", HOST, PORT));
if (replicaSet != null) {
connectionString.append(String.format("/?replicaSet=%s", REPLICA_SET));
}
ConnectionString uri = new ConnectionString(connectionString.toString());
settings.applyConnectionString(uri);
settings.readPreference(ReadPreference.primaryPreferred());
return MongoClients.create(settings.build());
}
private MongoHealthChecker() {
final DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
mongoClient = MongoClientWrapper.getBuilder(mongoDbConfig)
.connectionPoolMaxSize(HEALTH_CHECK_MAX_POOL_SIZE)
.build();
/*
* It's important to have the read preferences to primary preferred because the replication is to slow to retrieve
* the inserted document from a secondary directly after inserting it on the primary.
*/
collection = mongoClient.getCollection(TEST_COLLECTION_NAME)
.withReadPreference(ReadPreference.primaryPreferred());
materializer = ActorMaterializer.create(getContext());
}
private void prepareClient() {
try {
ServerAddress address = new ServerAddress(config.getMongo().getHost(), config.getMongo().getPort());
MongoClientOptions options = MongoClientOptions.builder()
.serverSelectionTimeout(5000)
.socketKeepAlive(false)
.readPreference(ReadPreference.primaryPreferred())
.sslInvalidHostNameAllowed(true)
.build();
client = connectToClient(address, options);
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
System.exit(-1);
}
}
@Test
public void testConnStringReadPreferenceTags() {
final ConnectionString connString = new ConnectionString("mongodb://localhost:27017/mydb?replicaSet=myapp" +
"&readPreference=nearest" +
"&readPreferenceTags=dc:ny,rack:1" +
"&readPreferenceTags=dc:ny" +
"&readPreferenceTags=");
List<TagSet> tagSets = new ArrayList<>();
List<Tag> tags = new ArrayList<>();
tags.add(new Tag("dc", "ny"));
tags.add(new Tag("rack", "1"));
tagSets.add(new TagSet(tags));
tags = new ArrayList<>();
tags.add(new Tag("dc", "ny"));
tagSets.add(new TagSet(tags));
tagSets.add(new TagSet());
ReadPreference expected = ReadPreference.valueOf("nearest", tagSets);
ReadPreference rp = new ReadPreferenceParser(connString, new JsonObject()).readPreference();
assertNotNull(rp);
assertEquals(expected, rp);
}
public void init(
Stage.Context context,
List<Stage.ConfigIssue> issues,
ReadPreference readPreference,
WriteConcern writeConcern
) {
mongoClient = createClient(context, issues, readPreference, writeConcern);
if (!issues.isEmpty()) {
return;
}
mongoDatabase = createMongoDatabase(context, issues, readPreference, writeConcern);
if (!issues.isEmpty()) {
return;
}
mongoCollection = createMongoCollection(context, issues, readPreference, writeConcern);
}
private MongoCollection createMongoCollection(
Stage.Context context,
List<Stage.ConfigIssue> issues,
ReadPreference readPreference,
WriteConcern writeConcern
) {
MongoCollection mongoCollection = null;
try {
if (readPreference != null) {
mongoCollection = mongoDatabase.getCollection(collection).withReadPreference(readPreference);
} else if (writeConcern != null) {
mongoCollection = mongoDatabase.getCollection(collection).withWriteConcern(writeConcern);
}
} catch (MongoClientException e) {
issues.add(context.createConfigIssue(
Groups.MONGODB.name(),
MONGO_CONFIG_PREFIX + "collection",
Errors.MONGODB_03,
collection,
e.toString()
));
}
return mongoCollection;
}
@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
logger.afterInterceptor(target, args, result, throwable);
}
if (args == null) {
return;
}
DatabaseInfo databaseInfo = DatabaseInfoUtils.getDatabaseInfo(target, UnKnownDatabaseInfo.MONGO_INSTANCE);
String readPreference = ((ReadPreference) args[0]).getName();
databaseInfo = new MongoDatabaseInfo(databaseInfo.getType(), databaseInfo.getExecuteQueryType()
, databaseInfo.getRealUrl(), databaseInfo.getUrl(), databaseInfo.getHost(), databaseInfo.getDatabaseId()
, ((MongoDatabaseInfo) databaseInfo).getCollectionName(), readPreference, ((MongoDatabaseInfo) databaseInfo).getWriteConcern());
if (result instanceof DatabaseInfoAccessor) {
((DatabaseInfoAccessor) result)._$PINPOINT$_setDatabaseInfo(databaseInfo);
}
}
/**
* Test when an empty entry is specified in a tag set is set in the properties.
*
*/
@Test
public void testEmptyEntry() {
secondaryReadRepository.setFailOnPrimary(false);
String tagSetProperty = "[{\"name\" : \"}, , {\"name1\" : \"value1\", \"name2\" : \"value2\"}]";
secondaryReadRepository.setTagSet(tagSetProperty);
DBObject[] tags = secondaryReadRepository.getTagSetsFromProperty();
Assert.assertNull(tags);
secondaryReadRepository.setReadProperties();
Mockito.verify(template).setReadPreference(ReadPreference.secondaryPreferred());
}
public MongoConfig(DataService dataService, String configId, Map<String, String> properties, boolean odataEnable)
throws DataServiceFault {
super(dataService, configId, DBConstants.DataSourceTypes.MONGODB, properties, odataEnable);
String serversParam = properties.get(DBConstants.MongoDB.SERVERS);
if (DBUtils.isEmptyString(serversParam)) {
throw new DataServiceFault("The data source param '" + DBConstants.MongoDB.SERVERS + "' is required");
}
this.servers = serversParam.split(",");
String database = properties.get(DBConstants.MongoDB.DATABASE);
if (DBUtils.isEmptyString(database)) {
throw new DataServiceFault("The data source param '" + DBConstants.MongoDB.DATABASE + "' is required");
}
try {
this.mongoClientOptions = extractMongoOptions(properties);
this.mongoClient = createNewMongo(properties);
String writeConcern = properties.get(DBConstants.MongoDB.WRITE_CONCERN);
if (!DBUtils.isEmptyString(writeConcern)) {
this.getMongoClient().setWriteConcern(WriteConcern.valueOf(writeConcern));
}
String readPref = properties.get(DBConstants.MongoDB.READ_PREFERENCE);
if (!DBUtils.isEmptyString(readPref)) {
this.getMongoClient().setReadPreference(ReadPreference.valueOf(readPref));
}
this.getMongoClient().getDatabase(database);
this.jongo = new Jongo(this.getMongoClient().getDB(database));
} catch (Exception e) {
throw new DataServiceFault(e, DBConstants.FaultCodes.CONNECTION_UNAVAILABLE_ERROR, e.getMessage());
}
}
private MongoClientWrapperBuilder() {
mongoClientSettingsBuilder = MongoClientSettings.builder();
mongoClientSettingsBuilder.readPreference(ReadPreference.primaryPreferred());
dittoMongoClientSettingsBuilder = DittoMongoClientSettings.getBuilder();
connectionString = null;
defaultDatabaseName = null;
sslEnabled = false;
eventLoopGroup = null;
}
/**
* Initializes the things search persistence with a passed in {@code persistence}.
*
* @param mongoClient the mongoDB persistence wrapper.
* @param actorSystem the Akka ActorSystem.
* @since 1.0.0
*/
public MongoThingsSearchPersistence(final DittoMongoClient mongoClient, final ActorSystem actorSystem) {
final MongoDatabase database = mongoClient.getDefaultDatabase();
// configure search persistence to stress the primary as little as possible and tolerate inconsistency
collection = database
.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadPreference(ReadPreference.secondaryPreferred());
log = Logging.getLogger(actorSystem, getClass());
final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
indexInitializer = IndexInitializer.of(database, materializer);
maxQueryTime = mongoClient.getDittoSettings().getMaxQueryTime();
hints = MongoHints.empty();
}
private void configureClientOptions(final Map<String, Object> properties) {
final MongoClientOptions.Builder builder = MongoClientOptions.builder();
final String writeConcern = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_WRITE_CONCERN);
final String readPreference = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_READ_PREFERENCE);
if (writeConcern != null) {
builder.writeConcern(WriteConcern.valueOf(writeConcern));
}
if (readPreference != null) {
builder.readPreference(ReadPreference.valueOf(readPreference));
}
mongoClientOptions = builder.build();
}
@Test
public void testMongoClientOptions() {
// GIVEN
final Map<String, Object> properties = new HashMap<>();
when(descriptor.getProperties()).thenReturn(properties);
properties.put("eclipselink.nosql.property.mongo.db", "foo");
// it looks like only the two options below are supported by EclipseLink
final ReadPreference readPreference = ReadPreference.nearest();
final WriteConcern writeConcern = WriteConcern.JOURNALED;
properties.put("eclipselink.nosql.property.mongo.read-preference", readPreference.getName());
properties.put("eclipselink.nosql.property.mongo.write-concern", "JOURNALED");
final ConfigurationFactory factory = new ConfigurationFactoryImpl();
// WHEN
final Configuration configuration = factory.createConfiguration(descriptor);
// THEN
assertThat(configuration, notNullValue());
final MongoClientOptions clientOptions = configuration.getClientOptions();
assertThat(clientOptions, notNullValue());
assertThat(clientOptions.getReadPreference(), equalTo(readPreference));
assertThat(clientOptions.getWriteConcern(), equalTo(writeConcern));
}
private static void externalizePropValues( Map<String,Object> propertiesMap )
{
// convert property value object not serializable by BSON serializer
// to its string representation
Object propValue = propertiesMap.get( QUERY_READ_PREF_PROP );
if( propValue instanceof ReadPreference )
propertiesMap.put( QUERY_READ_PREF_PROP, ((ReadPreference) propValue).getName() );
propValue = propertiesMap.get( QUERY_OPERATION_TYPE_PROP );
if( propValue instanceof CommandOperationType )
propertiesMap.put( QUERY_OPERATION_TYPE_PROP, propValue.toString() );
}
@Override
public <U> Iterator<U> aggregate(final String collectionName, final Class<U> target,
final AggregationOptions options,
final ReadPreference readPreference) {
LOG.debug("stages = " + stages);
AggregateIterable<U> cursor = collection.aggregate(stages, target);
return cursor.iterator();
}
/** @return The {@link DBObject} representing the object with the given id */
public static DBObject getById(AbstractMongoDBServer entity, String id) {
LOG.info("Getting {} from {}", new Object[]{id, entity});
MongoClient mongoClient = clientForServer(entity);
// Secondary preferred means the driver will let us read from secondaries too.
mongoClient.setReadPreference(ReadPreference.secondaryPreferred());
try {
DB db = mongoClient.getDB(TEST_DB);
DBCollection testCollection = db.getCollection(TEST_COLLECTION);
return testCollection.findOne(new BasicDBObject("_id", new ObjectId(id)));
} finally {
mongoClient.close();
}
}
/**
* Test read preference when multiple tag-set are set.
*/
@Test
public void testMultipleTagSets() {
secondaryReadRepository.setFailOnPrimary(true);
String tagSetProperty = "[{\"name\" : \"value\"}, {\"name1\" : \"value1\", \"name2\" : \"value2\"}, {\"name3\" : \"value3\"}]";
secondaryReadRepository.setTagSet(tagSetProperty);
DBObject firstTagSet = secondaryReadRepository.getFirstTagSet(secondaryReadRepository.getTagSetsFromProperty());
DBObject[] remainingTagSets = secondaryReadRepository.getRemainingTagSets(secondaryReadRepository.getTagSetsFromProperty());
secondaryReadRepository.setReadProperties();
Mockito.verify(template).setReadPreference(ReadPreference.secondary(firstTagSet, remainingTagSets));
}
public ReadPreference getQueryReadPreference()
{
Object propValue = getPropertiesMap().get( QUERY_READ_PREF_PROP );
if( propValue instanceof String )
propValue = toReadPreference( ((String)propValue) );
if( propValue instanceof ReadPreference )
{
// return explicit read preference mode to prevent confusion
return (ReadPreference)propValue;
}
return ReadPreferenceChoice.DEFAULT_PREFERENCE; // non-recognized data type; use default
}
/**
* Test failure on primary.
*/
@Test
public void testFailOnPrimary() {
secondaryReadRepository.setFailOnPrimary(true);
String tagSetProperty = "";
secondaryReadRepository.setTagSet(tagSetProperty);
secondaryReadRepository.setReadProperties();
Mockito.verify(template).setReadPreference(ReadPreference.secondary());
}
/**
* Configure using the specified parameters.
*/
public DbConfig(final List<ServerAddress> serverAddressList,
final String userName,
final String authenticationDatabase,
final String password,
final ReadPreference readPreference) {
this.connectionString = null;
this.serverAddressList = new ArrayList<>(serverAddressList);
this.userName = userName;
this.authenticationDatabase = authenticationDatabase;
this.password = password;
this.maxConnectionsPerHost = new MongoClientOptions.Builder().build().getConnectionsPerHost(); // 100
this.maxConnectionIdleTime = 600000; // 10 minutes
this.readPreference = readPreference;
}
ReadPreferenceParser(ConnectionString connectionString, JsonObject config) {
ReadPreference connStringReadPreference = connectionString != null ? connectionString.getReadPreference() : null;
if (connStringReadPreference != null) {
// Prefer connection string's read preference
readPreference = connStringReadPreference;
} else {
ReadPreference rp;
String rps = config.getString("readPreference");
if (rps != null) {
JsonArray readPreferenceTags = config.getJsonArray("readPreferenceTags");
if (readPreferenceTags == null) {
rp = ReadPreference.valueOf(rps);
if (rp == null) throw new IllegalArgumentException("Invalid ReadPreference " + rps);
} else {
// Support advanced ReadPreference Tags
List<TagSet> tagSet = new ArrayList<>();
readPreferenceTags.forEach(o -> {
String tagString = (String) o;
List<Tag> tags = Stream.of(tagString.trim().split(","))
.map(s -> s.split(":"))
.filter(array -> {
if (array.length != 2) {
throw new IllegalArgumentException("Invalid readPreferenceTags value '" + tagString + "'");
}
return true;
}).map(array -> new Tag(array[0], array[1])).collect(Collectors.toList());
tagSet.add(new TagSet(tags));
});
rp = ReadPreference.valueOf(rps, tagSet);
}
} else {
rp = null;
}
readPreference = rp;
}
}
@Test
public void testReadPreferenceCaseInsenitive() {
JsonObject config = new JsonObject();
config.put("readPreference", "PRIMARY");
ReadPreference rp = new ReadPreferenceParser(null, config).readPreference();
assertNotNull(rp);
assertEquals(ReadPreference.primary(), rp);
}
@Test
public void testReadPreferenceTags() {
List<TagSet> tagSets = new ArrayList<>();
List<Tag> tags = new ArrayList<>();
tags.add(new Tag("dc1", "ny"));
tags.add(new Tag("dc2", "tx"));
tags.add(new Tag("dc3", "ca"));
tagSets.add(new TagSet(tags));
tags = new ArrayList<>();
tags.add(new Tag("ac1", "ny"));
tags.add(new Tag("ac2", "tx"));
tags.add(new Tag("ac3", "ca"));
tagSets.add(new TagSet(tags));
ReadPreference expected = ReadPreference.valueOf("nearest", tagSets);
JsonObject config = new JsonObject();
config.put("readPreference", "nearest");
JsonArray array = new JsonArray();
array.add("dc1:ny,dc2:tx,dc3:ca");
array.add("ac1:ny,ac2:tx,ac3:ca");
config.put("readPreferenceTags", array);
ReadPreference rp = new ReadPreferenceParser(null, config).readPreference();
assertNotNull(rp);
assertEquals(expected, rp);
}
@Override
public void findOne(String statement, Object parameter, ResultHandler handler) {
findOne(statement, parameter, handler, ReadPreference.secondaryPreferred());
}
@Override
public void findOne(String statement, ResultHandler handler) {
findOne(statement, null, handler, ReadPreference.secondaryPreferred());
}
@Override
public <T> List<T> mapReduce(String statement, Object parameter) {
return mapReduce(statement, parameter, null, ReadPreference.secondaryPreferred());
}
@Override
public void group(String statement, Object parameter, ResultHandler handler) {
group(statement, parameter, handler, ReadPreference.secondaryPreferred());
}
@Override
public void find(String statement, Object parameter, ResultHandler handler) {
find(statement, parameter, null, null, handler, ReadPreference.secondaryPreferred());
}
@Override
public <T> Uni<T> runCommand(ClientSession clientSession, Bson command, ReadPreference readPreference,
Class<T> clazz) {
return Wrappers.toUni(database.runCommand(clientSession, command, readPreference, clazz));
}