下面列出了怎么用com.mongodb.client.MongoClients的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
@PrepareForTest(MongoClients.class)
@Test
public void testExecuteCommands()
{
MongoDatabase database = mockDatabase();
MongoCollection<Document> collection = mock(MongoCollection.class);
when(database.getCollection(COLLECTION_KEY)).thenReturn(collection);
Bson argument = mock(Bson.class);
FindIterable<Document> findIterable = mock(FindIterable.class);
when(collection.find(argument)).thenReturn(findIterable);
when(findIterable.spliterator()).thenReturn(List.of(DOCUMENT).spliterator());
MongoDbSteps steps = new MongoDbSteps(Map.of(LOCAL_KEY, CONNECTION_KEY), jsonUtils, context);
steps.executeCommands(List.of(
commandEntry(MongoCommand.FIND, argument),
commandEntry(MongoCommand.COLLECT, argument)
), COLLECTION_KEY, LOCAL_KEY, LOCAL_KEY, Set.of(VariableScope.STORY), VARIABLE_KEY);
verify(context).putVariable(Set.of(VariableScope.STORY), VARIABLE_KEY, String.format("[%s]", DOCUMENT_JSON));
}
private MongoDatabase mockDatabase()
{
mockStatic(MongoClients.class);
MongoClient client = mock(MongoClient.class);
MongoDatabase database = mock(MongoDatabase.class);
when(MongoClients.create(CONNECTION_KEY)).thenReturn(client);
when(client.getDatabase(LOCAL_KEY)).thenReturn(database);
return database;
}
@Override
public void start(final Map<String, String> props) {
LOGGER.info("Starting MongoDB source task");
try {
sourceConfig = new MongoSourceConfig(props);
} catch (Exception e) {
throw new ConnectException("Failed to start new task", e);
}
mongoClient =
MongoClients.create(
sourceConfig.getConnectionString(), getMongoDriverInformation(CONNECTOR_TYPE));
if (shouldCopyData()) {
setCachedResultAndResumeToken();
copyDataManager = new MongoCopyDataManager(sourceConfig, mongoClient);
isCopying.set(true);
} else {
cursor = createCursor(sourceConfig, mongoClient);
}
isRunning.set(true);
LOGGER.info("Started MongoDB source task");
}
/**
* Get default mongodb database reference or initiate it if not initialized.
*
* @param connectionString MongoDB standard connection string
* @param database mongodb database name
* @return MongoDB mongodb client database reference.
*/
public static MongoDatabase getOrInitDefaultDatabase(String connectionString, String database) {
if (DEFAULT_DATABASE == null) {
synchronized (LOCK) {
if (DEFAULT_DATABASE == null) {
if (!StringUtil.isEmpty(connectionString)) {
DEFAULT_CLIENT = MongoClients.create(connectionString);
CodecRegistry pojoCodecRegistry = fromRegistries(
/*fromCodecs(new StringCodecExt()),*/
MongoClientSettings.getDefaultCodecRegistry(),
fromProviders(PojoCodecProvider.builder().automatic(true).build()));
DEFAULT_DATABASE = DEFAULT_CLIENT.getDatabase(database).withCodecRegistry(pojoCodecRegistry);
} else {
throw new RuntimeException("No datasource configuration found for mongodb.");
}
}
}
}
return DEFAULT_DATABASE;
}
private static MongoClient getClient(Boolean useCredentials, String replicaSet) {
MongoClientSettings.Builder settings = MongoClientSettings.builder();
if (useCredentials) {
MongoCredential credentials = MongoCredential.createCredential(
USER, ADMIN_DB, PASSWORD.toCharArray());
settings.credential(credentials);
}
StringBuilder connectionString = new StringBuilder(String.format("mongodb://%s:%d", HOST, PORT));
if (replicaSet != null) {
connectionString.append(String.format("/?replicaSet=%s", REPLICA_SET));
}
ConnectionString uri = new ConnectionString(connectionString.toString());
settings.applyConnectionString(uri);
settings.readPreference(ReadPreference.primaryPreferred());
return MongoClients.create(settings.build());
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(UimaContext context) throws ResourceInitializationException {
super.initialize(context);
if(logger == null) {
logger = context.getLogger();
}
// Initialize the MongoDB Connection
try{
String connectionString;
if(this.mongoUser == null){
connectionString = String.format("mongodb://%s", this.mongoHost);
logger.log(Level.SEVERE, "Using unauthenticated connection string: " + connectionString);
}else{
connectionString = String.format("mongodb://%s:%[email protected]%s", this.mongoUser, this.mongoPw, this.mongoHost);
}
this.mongoClient = MongoClients.create(connectionString);
this.mongoDb = mongoClient.getDatabase(this.mongoDbName);
this.coll = mongoDb.getCollection(this.collection);
}catch(Exception e){
throw new ResourceInitializationException(e);
}
}
@Bean(destroyMethod = "")
public MongoClient mongoClient(@Value("${mongo.host:}") String host, @Value("${mongo.jndiName:}") String jndiName) throws NamingException
{
if (!Strings.isNullOrEmpty(jndiName))
{
JndiTemplate jndiTemplate = new JndiTemplate();
return jndiTemplate.lookup(jndiName, MongoClient.class);
}
else if (!Strings.isNullOrEmpty(host))
{
return MongoClients.create(host);
}
else
{
throw new RuntimeException("Either mongo.host or mongo.jndiName must be set");
}
}
@BeforeClass
public static void setUp() {
PipelineOptionsFactory.register(MongoDBPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(MongoDBPipelineOptions.class);
collection = String.format("test_%s", new Date().getTime());
bigQueryDataset = options.getBigQueryDataset();
bigQueryTable = options.getBigQueryTable();
mongoUrl =
String.format("mongodb://%s:%s", options.getMongoDBHostName(), options.getMongoDBPort());
mongoClient = MongoClients.create(mongoUrl);
settings =
InfluxDBSettings.builder()
.withHost(options.getInfluxHost())
.withDatabase(options.getInfluxDatabase())
.withMeasurement(options.getInfluxMeasurement())
.get();
}
@Setup
public void setUp() throws Exception {
client = MongoClients.create();
template = new MongoTemplate(client, DB_NAME);
List<RefObject> refObjects = new ArrayList<>();
for (int i = 0; i < 1; i++) {
RefObject o = new RefObject();
template.save(o);
refObjects.add(o);
}
ObjectWithDBRef singleDBRef = new ObjectWithDBRef();
singleDBRef.ref = refObjects.iterator().next();
template.save(singleDBRef);
ObjectWithDBRef multipleDBRefs = new ObjectWithDBRef();
multipleDBRefs.refList = refObjects;
template.save(multipleDBRefs);
queryObjectWithDBRef = query(where("id").is(singleDBRef.id));
queryObjectWithDBRefList = query(where("id").is(multipleDBRefs.id));
}
@Setup
public void setUp() {
client = MongoClients.create();
template = new MongoTemplate(client, DB_NAME);
source = new Person();
source.firstname = "luke";
source.lastname = "skywalker";
source.address = new Address();
source.address.street = "melenium falcon 1";
source.address.city = "deathstar";
template.save(source, COLLECTION_NAME);
asPerson = template.query(Person.class).inCollection(COLLECTION_NAME);
asDtoProjection = template.query(Person.class).inCollection(COLLECTION_NAME).as(DtoProjection.class);
asClosedProjection = template.query(Person.class).inCollection(COLLECTION_NAME).as(ClosedProjection.class);
asOpenProjection = template.query(Person.class).inCollection(COLLECTION_NAME).as(OpenProjection.class);
asPersonWithFieldsRestriction = template.query(Person.class).inCollection(COLLECTION_NAME)
.matching(new BasicQuery(new Document(), fields));
mongoCollection = client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
}
protected TestBase() {
Builder builder = MongoClientSettings.builder();
try {
builder.uuidRepresentation(mapperOptions.getUuidRepresentation());
} catch(Exception ignored) {
// not a 4.0 driver
}
MongoClientSettings clientSettings = builder
.applyConnectionString(new ConnectionString(getMongoURI()))
.build();
this.mongoClient = MongoClients.create(clientSettings);
this.database = getMongoClient().getDatabase(TEST_DB_NAME);
this.ds = Morphia.createDatastore(getMongoClient(), database.getName());
ds.setQueryFactory(new DefaultQueryFactory());
}
private MongoDb4Provider(final String connectionStringSource, final boolean isCapped,
final Integer collectionSize) {
LOGGER.debug("Creating ConnectionString {}...", connectionStringSource);
this.connectionString = new ConnectionString(connectionStringSource);
LOGGER.debug("Created ConnectionString {}", connectionString);
LOGGER.debug("Creating MongoClientSettings...");
// @formatter:off
final MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(this.connectionString)
.codecRegistry(CODEC_REGISTRIES)
.build();
// @formatter:on
LOGGER.debug("Created MongoClientSettings {}", settings);
LOGGER.debug("Creating MongoClient {}...", settings);
this.mongoClient = MongoClients.create(settings);
LOGGER.debug("Created MongoClient {}", mongoClient);
String databaseName = this.connectionString.getDatabase();
LOGGER.debug("Getting MongoDatabase {}...", databaseName);
this.mongoDatabase = this.mongoClient.getDatabase(databaseName);
LOGGER.debug("Got MongoDatabase {}", mongoDatabase);
this.isCapped = isCapped;
this.collectionSize = collectionSize;
}
/**
* Used to get an existing, pooled, connection or to create a new connection
* for the given connection string.
*
* @param connStr MongoClient connection details, format is expected to be:
* mongodb://<username>:<password>@<hostname>:<port>/?ssl=true&ssl_ca_certs=<certs.pem>&replicaSet=<replica_set>
* @return A MongoClient connection if the connection succeeded, else the function will throw.
*/
public synchronized MongoClient getOrCreateConn(String connStr)
{
logger.info("getOrCreateConn: enter");
MongoClient result = clientCache.get(connStr);
if (result == null || !connectionTest(result)) {
result = MongoClients.create(connStr);
clientCache.put(connStr, result);
}
logger.info("getOrCreateConn: exit");
return result;
}
private void executeInDatabase(String connectionKey, String dbKey, Consumer<MongoDatabase> databaseConsumer)
{
String connection = connections.get(connectionKey);
Validate.validState(connection != null, "Connection with key '%s' does not exist", connectionKey);
try (MongoClient client = MongoClients.create(connection))
{
MongoDatabase database = client.getDatabase(dbKey);
databaseConsumer.accept(database);
}
}
@PrepareForTest(MongoClients.class)
@Test
public void testExecuteCommand()
{
MongoDatabase database = mockDatabase();
when(database.runCommand(COMMAND)).thenReturn(DOCUMENT);
MongoDbSteps steps = new MongoDbSteps(Map.of(LOCAL_KEY, CONNECTION_KEY), jsonUtils, context);
steps.executeCommand(COMMAND, LOCAL_KEY, LOCAL_KEY, Set.of(VariableScope.STORY), VARIABLE_KEY);
verify(context).putVariable(Set.of(VariableScope.STORY), VARIABLE_KEY, Map.of("id", "1"));
}
/**
* Convenience for {@link #builder(String, SupplierEx)}.
*/
public static Sink<Document> mongodb(
@Nonnull String name,
@Nonnull String connectionString,
@Nonnull String database,
@Nonnull String collection
) {
return MongoDBSinks
.<Document>builder(name, () -> MongoClients.create(connectionString))
.databaseFn(client -> client.getDatabase(database))
.collectionFn(db -> db.getCollection(collection))
.destroyFn(MongoClient::close)
.build();
}
static MongoClient mongoClient(String connectionString, int connectionTimeoutSeconds) {
MongoClientSettings settings = MongoClientSettings
.builder()
.applyConnectionString(new ConnectionString(connectionString))
.applyToClusterSettings(b -> {
b.serverSelectionTimeout(connectionTimeoutSeconds, SECONDS);
})
.build();
return MongoClients.create(settings);
}
private MongoClient getMongoClient() {
if (mongoClient == null) {
mongoClient =
MongoClients.create(
sinkConfig.getConnectionString(), getMongoDriverInformation(CONNECTOR_TYPE));
}
return mongoClient;
}
private boolean isReplicaSetOrSharded() {
try (MongoClient mongoClient = MongoClients.create(getConnectionString())) {
Document isMaster =
mongoClient.getDatabase("admin").runCommand(BsonDocument.parse("{isMaster: 1}"));
return isMaster.containsKey("setName") || isMaster.get("msg", "").equals("isdbgrid");
} catch (Exception e) {
return false;
}
}
public synchronized MongoClient getMongoClientInstance(String mongoClientUri) {
MongoClient mongoClient;
if (!this.mongoUriToClientMap.containsKey(mongoClientUri)) {
System.out.println("Creating new mongo client to connect to " + mongoClientUri);
mongoClient = MongoClients.create(mongoClientUri);
this.mongoUriToClientMap.put(mongoClientUri, mongoClient);
this.activeClientCountMap.put(mongoClientUri, new AtomicInteger(1));
} else {
System.out.println("Reusing existing mongo client for " + mongoClientUri);
mongoClient = this.mongoUriToClientMap.get(mongoClientUri);
this.activeClientCountMap.get(mongoClientUri).incrementAndGet();
}
return mongoClient;
}
private static void initializeReplicaSet(final List<IMongodConfig> mongodConfigList) throws UnknownHostException {
final String arbitrerAddress = "mongodb://" + mongodConfigList.get(0).net().getServerAddress().getHostName() + ":"
+ mongodConfigList.get(0).net().getPort();
final MongoClientSettings mo = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(arbitrerAddress)).build();
try (MongoClient mongo = MongoClients.create(mo)) {
final MongoDatabase mongoAdminDB = mongo.getDatabase("admin");
Document cr = mongoAdminDB.runCommand(new Document("isMaster", 1));
LOGGER.infof("isMaster: %s", cr);
// Build replica set configuration settings
final Document rsConfiguration = buildReplicaSetConfiguration(mongodConfigList);
LOGGER.infof("replSetSettings: %s", rsConfiguration);
// Initialize replica set
cr = mongoAdminDB.runCommand(new Document("replSetInitiate", rsConfiguration));
LOGGER.infof("replSetInitiate: %s", cr);
// Check replica set status before to proceed
await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.MINUTES)
.until(() -> {
Document result = mongoAdminDB.runCommand(new Document("replSetGetStatus", 1));
LOGGER.infof("replSetGetStatus: %s", result);
return !isReplicaSetStarted(result);
});
}
}
private static void initializeReplicaSet(final List<IMongodConfig> mongodConfigList) throws UnknownHostException {
final String arbitrerAddress = "mongodb://" + mongodConfigList.get(0).net().getServerAddress().getHostName() + ":"
+ mongodConfigList.get(0).net().getPort();
final MongoClientSettings mo = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(arbitrerAddress)).build();
try (MongoClient mongo = MongoClients.create(mo)) {
final MongoDatabase mongoAdminDB = mongo.getDatabase("admin");
Document cr = mongoAdminDB.runCommand(new Document("isMaster", 1));
LOGGER.infof("isMaster: %s", cr);
// Build replica set configuration settings
final Document rsConfiguration = buildReplicaSetConfiguration(mongodConfigList);
LOGGER.infof("replSetSettings: %s", rsConfiguration);
// Initialize replica set
cr = mongoAdminDB.runCommand(new Document("replSetInitiate", rsConfiguration));
LOGGER.infof("replSetInitiate: %s", cr);
// Check replica set status before to proceed
await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.MINUTES)
.until(() -> {
Document result = mongoAdminDB.runCommand(new Document("replSetGetStatus", 1));
LOGGER.infof("replSetGetStatus: %s", result);
return !isReplicaSetStarted(result);
});
}
}
@Override
public void customize(ComponentProxyComponent component, Map<String, Object> options) {
MongoCustomizersUtil.replaceAdminDBIfMissing(options);
// Set connection parameter
if (!options.containsKey("mongoConnection")) {
if (options.containsKey("user") && options.containsKey("password")
&& options.containsKey("host")) {
ConnectionParamsConfiguration mongoConf = new ConnectionParamsConfiguration(cast(options));
// We need to force consumption in order to perform property placeholder done by Camel
consumeOption(camelContext, options, "password", String.class, mongoConf::setPassword);
LOGGER.debug("Creating and registering a client connection to {}", mongoConf);
MongoClientSettings.Builder settings = MongoClientSettings.builder();
MongoCredential credentials = MongoCredential.createCredential(
mongoConf.getUser(),
mongoConf.getAdminDB(),
mongoConf.getPassword().toCharArray());
ConnectionString uri = new ConnectionString(mongoConf.getMongoClientURI());
settings.applyConnectionString(uri);
settings.credential(credentials);
MongoClient mongoClient = MongoClients.create(settings.build());
options.put("mongoConnection", mongoClient);
if (!options.containsKey("connectionBean")) {
//We safely put a default name instead of leaving null
options.put("connectionBean", String.format("%s-%s", mongoConf.getHost(), mongoConf.getUser()));
}
} else {
LOGGER.warn(
"Not enough information provided to set-up the MongoDB client. Required at least host, user and " +
"password.");
}
}
}
@BeforeAll
public static void startMongo() throws IOException {
mongodExe = starter.prepare(new MongodConfigBuilder()
.version(Version.Main.V3_6)
.build());
mongod = mongodExe.start();
mongo = MongoClients.create("mongodb://localhost:"+mongod.getConfig().net().getPort());
}
private void doWrite(Collection<RunResult> results) throws ParseException {
Date now = new Date();
StandardEnvironment env = new StandardEnvironment();
String projectVersion = env.getProperty("project.version", "unknown");
String gitBranch = env.getProperty("git.branch", "unknown");
String gitDirty = env.getProperty("git.dirty", "no");
String gitCommitId = env.getProperty("git.commit.id", "unknown");
ConnectionString uri = new ConnectionString(this.uri);
MongoClient client = MongoClients.create();
String dbName = StringUtils.hasText(uri.getDatabase()) ? uri.getDatabase() : "spring-data-mongodb-benchmarks";
MongoDatabase db = client.getDatabase(dbName);
String resultsJson = ResultsWriter.jsonifyResults(results).trim();
JSONArray array = (JSONArray) new JSONParser(JSONParser.MODE_PERMISSIVE).parse(resultsJson);
for (Object object : array) {
JSONObject dbo = (JSONObject) object;
String collectionName = extractClass(dbo.get("benchmark").toString());
Document sink = new Document();
sink.append("_version", projectVersion);
sink.append("_branch", gitBranch);
sink.append("_commit", gitCommitId);
sink.append("_dirty", gitDirty);
sink.append("_method", extractBenchmarkName(dbo.get("benchmark").toString()));
sink.append("_date", now);
sink.append("_snapshot", projectVersion.toLowerCase().contains("snapshot"));
sink.putAll(dbo);
db.getCollection(collectionName).insertOne(fixDocumentKeys(sink));
}
client.close();
}
private MongoDatabase createDatabase(CodecRegistry registry) {
if (uri == null) throw new Error("uri must not be null");
String database = uri.getDatabase();
if (database == null) throw new Error("uri must have database, uri=" + uri);
var watch = new StopWatch();
try {
connectionPoolSettings.maxWaitTime(timeoutInMs, TimeUnit.MILLISECONDS); // pool checkout timeout
var socketSettings = SocketSettings.builder()
.connectTimeout((int) timeoutInMs, TimeUnit.MILLISECONDS)
.readTimeout((int) timeoutInMs, TimeUnit.MILLISECONDS)
.build();
var clusterSettings = ClusterSettings.builder()
.serverSelectionTimeout(timeoutInMs * 3, TimeUnit.MILLISECONDS) // able to try 3 servers
.build();
var settings = MongoClientSettings.builder()
.applicationName(LogManager.APP_NAME)
.codecRegistry(registry)
.applyToConnectionPoolSettings(builder -> builder.applySettings(connectionPoolSettings.build()))
.applyToSocketSettings(builder -> builder.applySettings(socketSettings))
.applyToClusterSettings(builder -> builder.applySettings(clusterSettings))
.applyConnectionString(uri)
.build();
mongoClient = MongoClients.create(settings);
return mongoClient.getDatabase(database);
} finally {
logger.info("create mongo client, uri={}, elapsed={}", uri, watch.elapsed());
}
}
@Override
@Bean
public com.mongodb.client.MongoClient mongoClient() {
final CodecRegistry pojoCodecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(),
fromProviders(PojoCodecProvider.builder().automatic(true).build()));
final String connectionsString = String.format("mongodb://%s:%[email protected]%s:%s", this.username, this.password, this.host, this.port);
final MongoClientSettings settings = MongoClientSettings.builder()
.codecRegistry(pojoCodecRegistry)
.applyConnectionString(new ConnectionString(
connectionsString))
.build();
return MongoClients.create(settings);
}
@Override
public MongoClient mongoClient() {
// Replacing fongo by mongo-java-server, according to
// https://github.com/fakemongo/fongo/issues/337
//return new Fongo("mongo-test").getMongo();
// Create a server and bind on a random local port
server = new MongoServer(new MemoryBackend());
InetSocketAddress serverAddress = server.bind();
//client = new MongoClient(new ServerAddress(serverAddress));
client = MongoClients.create("mongodb://localhost:" + serverAddress.getPort());
return client;
}
private void connect() {
this.mongoClient = MongoClients.create(getConnectionString());
this.datastore = Morphia.createDatastore(this.mongoClient, this.config.getMongoDbName());
this.datastore.getMapper().mapPackage(this.config.getMongoPackage());
LOG.info("Created MongoClient connected to {}:{} with credentials = {}", this.config.getMongoHost(), this.config.getMongoPort(), this.config.isMongoAuth());
LOG.info("Mapped Morphia models of package '{}' and created Morphia Datastore conntected to database '{}'", this.config.getMongoPackage(), this.config.getMongoDbName());
}
@Override
public void executeApp() throws Exception {
GenericContainer<?> mongo = new GenericContainer<>("mongo:4.0.3");
mongo.setExposedPorts(Arrays.asList(27017));
mongo.start();
try {
mongoClient = MongoClients.create("mongodb://" + mongo.getContainerIpAddress() + ":"
+ mongo.getMappedPort(27017));
beforeTransactionMarker();
transactionMarker();
} finally {
mongo.close();
}
}