下面列出了org.junit.jupiter.api.extension.ExtensionConfigurationException#com.mongodb.reactivestreams.client.MongoDatabase 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private DatabaseManager() {
final MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder()
.codecRegistry(CODEC_REGISTRY)
.applicationName(String.format("Shadbot V%s", Config.VERSION));
if (!Config.IS_SNAPSHOT) {
final String username = CredentialManager.getInstance().get(Credential.DATABASE_USERNAME);
final String pwd = CredentialManager.getInstance().get(Credential.DATABASE_PWD);
final String host = CredentialManager.getInstance().get(Credential.DATABASE_HOST);
final String port = CredentialManager.getInstance().get(Credential.DATABASE_PORT);
if (username != null && pwd != null && host != null && port != null) {
settingsBuilder.applyConnectionString(new ConnectionString(
String.format("mongodb://%s:%[email protected]%s:%s/%s", username, pwd, host, port, Config.DATABASE_NAME)));
}
}
this.client = MongoClients.create(settingsBuilder.build());
final MongoDatabase database = this.client.getDatabase(Config.DATABASE_NAME);
this.premiumCollection = new PremiumCollection(database);
this.guildsCollection = new GuildsCollection(database);
this.lotteryCollection = new LotteryCollection(database);
this.usersCollection = new UsersCollection(database);
}
/**
* Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
*
* @param database The database to use.
* @param collectionName The name of the capped collection that should be created.
* @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
* @param materializer The actor materializer to pre-materialize the restart source.
* @return Returns the created or retrieved collection.
*/
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes,
final ActorMaterializer materializer) {
final Source<Success, NotUsed> createCollectionSource =
repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);
final Source<MongoCollection, NotUsed> infiniteCollectionSource =
createCollectionSource.map(success -> database.getCollection(collectionName))
.flatMapConcat(Source::repeat);
final Source<MongoCollection, NotUsed> restartSource =
RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);
// pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
// until the stream fails, whereupon it gets recreated with backoff.
return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
private static Source<Success, NotUsed> repeatableCreateCappedCollectionSource(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes) {
final CreateCollectionOptions collectionOptions = new CreateCollectionOptions()
.capped(true)
.sizeInBytes(cappedCollectionSizeInBytes)
.maxDocuments(1);
return Source.lazily(
() -> Source.fromPublisher(database.createCollection(collectionName, collectionOptions)))
.mapMaterializedValue(whatever -> NotUsed.getInstance())
.withAttributes(Attributes.inputBuffer(1, 1))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Success, NotUsed>>()
.match(MongoCommandException.class,
MongoTimestampPersistence::isCollectionAlreadyExistsError,
error -> Source.single(Success.SUCCESS))
.build());
}
private static void assertWithExpected(final DittoMongoClient mongoClient, final boolean sslEnabled,
final boolean withCredentials) {
final MongoClientSettings mongoClientSettings = mongoClient.getSettings();
assertThat(mongoClientSettings.getClusterSettings().getHosts())
.isEqualTo(Collections.singletonList(new ServerAddress(KNOWN_SERVER_ADDRESS)));
final List<MongoCredential> expectedCredentials = withCredentials ? Collections.singletonList(
MongoCredential.createCredential(KNOWN_USER, KNOWN_DB_NAME, KNOWN_PASSWORD.toCharArray())) :
Collections.emptyList();
assertThat(mongoClientSettings.getCredentialList()).isEqualTo(
expectedCredentials);
assertThat(mongoClientSettings.getSslSettings().isEnabled()).isEqualTo(sslEnabled);
final MongoDatabase mongoDatabase = mongoClient.getDefaultDatabase();
assertThat(mongoDatabase).isNotNull();
assertThat(mongoDatabase.getName()).isEqualTo(KNOWN_DB_NAME);
}
/**
* Create Props of this actor.
*
* @param pubSubMediator Akka pub-sub mediator.
* @param mongoDbConfig the MongoDB configuration settings.
* @param config Configuration with info about event journal, snapshot store and database.
* @param persistenceOperationsConfig the persistence operations config.
* @return a Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final MongoDbConfig mongoDbConfig,
final Config config,
final PersistenceOperationsConfig persistenceOperationsConfig) {
return Props.create(ThingPersistenceOperationsActor.class, () -> {
final MongoEventSourceSettings eventSourceSettings =
MongoEventSourceSettings.fromConfig(config, ThingPersistenceActor.PERSISTENCE_ID_PREFIX, true,
ThingPersistenceActor.JOURNAL_PLUGIN_ID, ThingPersistenceActor.SNAPSHOT_PLUGIN_ID);
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
final MongoDatabase db = mongoClient.getDefaultDatabase();
final NamespacePersistenceOperations namespaceOps =
MongoNamespacePersistenceOperations.of(db, eventSourceSettings);
return new ThingPersistenceOperationsActor(pubSubMediator, namespaceOps, mongoClient,
persistenceOperationsConfig);
});
}
private KillSwitch startSearchUpdaterStream(final SearchConfig searchConfig,
final ActorSystem actorSystem,
final ShardRegionFactory shardRegionFactory,
final int numberOfShards,
final ActorRef updaterShard,
final ActorRef changeQueueActor,
final MongoDatabase mongoDatabase,
final BlockedNamespaces blockedNamespaces) {
final ActorRef thingsShard = shardRegionFactory.getThingsShardRegion(numberOfShards);
final ActorRef policiesShard = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
final SearchUpdaterStream searchUpdaterStream =
SearchUpdaterStream.of(searchConfig, actorSystem, thingsShard, policiesShard, updaterShard,
changeQueueActor, mongoDatabase, blockedNamespaces);
return searchUpdaterStream.start(getContext());
}
/**
* Create Props of this actor.
*
* @param pubSubMediator Akka pub-sub mediator.
* @param mongoDbConfig the MongoDB configuration settings.
* @param config configuration with info about event journal, snapshot store and database.
* @param persistenceOperationsConfig the persistence operations configuration settings.
* @return a Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final MongoDbConfig mongoDbConfig,
final Config config,
final PersistenceOperationsConfig persistenceOperationsConfig) {
return Props.create(ConnectionPersistenceOperationsActor.class, () -> {
final MongoEventSourceSettings eventSourceSettings =
MongoEventSourceSettings.fromConfig(config, ConnectionPersistenceActor.PERSISTENCE_ID_PREFIX, false,
ConnectionPersistenceActor.JOURNAL_PLUGIN_ID, ConnectionPersistenceActor.SNAPSHOT_PLUGIN_ID);
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
final MongoDatabase db = mongoClient.getDefaultDatabase();
final EntityPersistenceOperations entitiesOps =
MongoEntitiesPersistenceOperations.of(db, eventSourceSettings);
return new ConnectionPersistenceOperationsActor(pubSubMediator, entitiesOps, mongoClient,
persistenceOperationsConfig);
});
}
/**
* Create Props of this actor.
*
* @param pubSubMediator Akka pub-sub mediator.
* @param mongoDbConfig the MongoDB configuration settings.
* @param config Configuration with info about event journal, snapshot store and database.
* @param persistenceOperationsConfig the persistence operations configuration settings.
* @return a Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final MongoDbConfig mongoDbConfig,
final Config config,
final PersistenceOperationsConfig persistenceOperationsConfig) {
return Props.create(PolicyPersistenceOperationsActor.class, () -> {
final MongoEventSourceSettings eventSourceSettings =
MongoEventSourceSettings.fromConfig(config, PolicyPersistenceActor.PERSISTENCE_ID_PREFIX, true,
PolicyPersistenceActor.JOURNAL_PLUGIN_ID, PolicyPersistenceActor.SNAPSHOT_PLUGIN_ID);
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
final MongoDatabase db = mongoClient.getDefaultDatabase();
final NamespacePersistenceOperations namespaceOps =
MongoNamespacePersistenceOperations.of(db, eventSourceSettings);
final EntityPersistenceOperations entitiesOps =
MongoEntitiesPersistenceOperations.of(db, eventSourceSettings);
return new PolicyPersistenceOperationsActor(pubSubMediator, namespaceOps, entitiesOps, mongoClient,
persistenceOperationsConfig);
});
}
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
log.info("Open MongoDB Sink");
mongoConfig = MongoConfig.load(config);
mongoConfig.validate(true, true);
if (clientProvider != null) {
mongoClient = clientProvider.get();
} else {
mongoClient = MongoClients.create(mongoConfig.getMongoUri());
}
final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
collection = db.getCollection(mongoConfig.getCollection());
incomingList = Lists.newArrayList();
flushExecutor = Executors.newScheduledThreadPool(1);
flushExecutor.scheduleAtFixedRate(() -> flush(),
mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
}
@BeforeMethod
public void setUp() {
map = TestHelper.createMap(true);
mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
mockMongoClient = mock(MongoClient.class);
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
mockPublisher = mock(Publisher.class);
sink = new MongoSink(() -> mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
when(mockMongoDb.getCollection(anyString()).insertMany(any())).thenReturn(mockPublisher);
}
@BeforeMethod
public void setUp() {
map = TestHelper.createMap(true);
mockSourceContext = mock(SourceContext.class);
mockMongoClient = mock(MongoClient.class);
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
mockPublisher = mock(ChangeStreamPublisher.class);
source = new MongoSource(() -> mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
when(mockMongoColl.watch()).thenReturn(mockPublisher);
when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);
doAnswer((invocation) -> {
subscriber = invocation.getArgument(0, Subscriber.class);
return null;
}).when(mockPublisher).subscribe(any());
}
private Source<Document, NotUsed> find(final MongoDatabase db, final String collection, final Document filter,
final Document project) {
return Source.fromPublisher(
db.getCollection(collection).find(filter).projection(project).sort(ID_DESC)
);
}
/**
* Create a restart-able SearchUpdaterStream object.
*
* @param searchConfig the configuration settings of the Things-Search service.
* @param actorSystem actor system to run the stream in.
* @param thingsShard shard region proxy of things.
* @param policiesShard shard region proxy of policies.
* @param updaterShard shard region of search updaters.
* @param changeQueueActor reference of the change queue actor.
* @param database MongoDB database.
* @return a SearchUpdaterStream object.
*/
public static SearchUpdaterStream of(final SearchConfig searchConfig,
final ActorSystem actorSystem,
final ActorRef thingsShard,
final ActorRef policiesShard,
final ActorRef updaterShard,
final ActorRef changeQueueActor,
final MongoDatabase database,
final BlockedNamespaces blockedNamespaces) {
final StreamConfig streamConfig = searchConfig.getStreamConfig();
final StreamCacheConfig cacheConfig = streamConfig.getCacheConfig();
final String dispatcherName = cacheConfig.getDispatcherName();
final MessageDispatcher messageDispatcher = actorSystem.dispatchers().lookup(dispatcherName);
final DeleteConfig deleteConfig = searchConfig.getDeleteConfig();
final boolean deleteEvent = deleteConfig.isDeleteEvent();
final EnforcementFlow enforcementFlow =
EnforcementFlow.of(streamConfig, thingsShard, policiesShard, messageDispatcher,
deleteEvent);
final MongoSearchUpdaterFlow mongoSearchUpdaterFlow = MongoSearchUpdaterFlow.of(database);
final BulkWriteResultAckFlow bulkWriteResultAckFlow = BulkWriteResultAckFlow.of(updaterShard);
return new SearchUpdaterStream(searchConfig, enforcementFlow, mongoSearchUpdaterFlow, bulkWriteResultAckFlow,
changeQueueActor, blockedNamespaces);
}
/**
* Initializes the things search persistence with a passed in {@code persistence}.
*
* @param mongoClient the mongoDB persistence wrapper.
* @param actorSystem the Akka ActorSystem.
* @since 1.0.0
*/
public MongoThingsSearchPersistence(final DittoMongoClient mongoClient, final ActorSystem actorSystem) {
final MongoDatabase database = mongoClient.getDefaultDatabase();
// configure search persistence to stress the primary as little as possible and tolerate inconsistency
collection = database
.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME)
.withReadPreference(ReadPreference.secondaryPreferred());
log = Logging.getLogger(actorSystem, getClass());
final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
indexInitializer = IndexInitializer.of(database, materializer);
maxQueryTime = mongoClient.getDittoSettings().getMaxQueryTime();
hints = MongoHints.empty();
}
/**
* Create a new GridFS bucket with the default {@code 'fs'} bucket name
*
* <p>Requires the concrete {@link MongoDatabaseImpl} implementation of the MongoDatabase interface.</p>
*
* @param database the database instance to use with GridFS.
* @return the GridFSBucket
*/
public static GridFSBucket create(final MongoDatabase database) {
notNull("database", database);
if (database instanceof MongoDatabaseImpl) {
return new GridFSBucketImpl(com.mongodb.async.client.gridfs.GridFSBuckets.create(((MongoDatabaseImpl) database).getWrapped()));
} else {
throw new IllegalArgumentException("GridFS requires the concrete MongoDatabaseImpl implementation.");
}
}
/**
* Create a new GridFS bucket with a custom bucket name
*
* <p>Requires the concrete {@link MongoDatabaseImpl} implementation of the MongoDatabase interface.</p>
*
* @param database the database instance to use with GridFS
* @param bucketName the custom bucket name to use
* @return the GridFSBucket
*/
public static GridFSBucket create(final MongoDatabase database, final String bucketName) {
notNull("database", database);
notNull("bucketName", bucketName);
if (database instanceof MongoDatabaseImpl) {
return new GridFSBucketImpl(com.mongodb.async.client.gridfs.GridFSBuckets.create(((MongoDatabaseImpl) database).getWrapped(),
bucketName));
} else {
throw new IllegalArgumentException("GridFS requires the concrete MongoDatabaseImpl implementation.");
}
}
static CollectionResolver defaultResolver(MongoDatabase database, CodecRegistry registry) {
Objects.requireNonNull(database, "database");
Objects.requireNonNull(registry, "registry");
return entityClass -> {
final String collectionName = ContainerNaming.DEFAULT.name(entityClass);
return database.getCollection(collectionName)
.withDocumentClass(entityClass)
.withCodecRegistry(registry);
};
}
BackendResource(MongoDatabase database) {
this.database = Objects.requireNonNull(database, "database");
final ObjectMapper mapper = new ObjectMapper()
.registerModule(new BsonModule())
.registerModule(new GuavaModule())
.registerModule(new Jdk8Module())
.registerModule(new IdAnnotationModule());
this.registry = JacksonCodecs.registryFromMapper(mapper);
this.resolver = new LazyResolver();
this.backend = new MongoBackend(MongoSetup.of(this.resolver));
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
final Class<?> type = parameterContext.getParameter().getType();
if (MongoDatabase.class.isAssignableFrom(type)) {
return getOrCreate(extensionContext).instance.database();
} else if (MongoClient.class.isAssignableFrom(type)) {
return getOrCreate(extensionContext).instance.client();
}
throw new ExtensionConfigurationException(String.format("%s supports only %s or %s but yours was %s", MongoExtension.class.getSimpleName(),
MongoDatabase.class.getName(), MongoClient.class.getName(), type.getName()));
}
@SuppressWarnings("CheckReturnValue")
private void clear() {
// drop all collections
MongoDatabase database = instance.database();
Flowable.fromPublisher(database.listCollectionNames())
.flatMap(col -> database.getCollection(col).drop())
.toList()
.blockingGet();
}
ReactiveMongoDatabaseImpl(MongoDatabase database) {
this.database = database;
}
@Override
public MongoDatabase unwrap() {
return database;
}
public GuildsCollection(MongoDatabase database) {
super(database.getCollection(GuildsCollection.NAME));
}
public PremiumCollection(MongoDatabase database) {
super(database.getCollection(PremiumCollection.NAME));
}
public UsersCollection(MongoDatabase database) {
super(database.getCollection(UsersCollection.NAME));
}
public LotteryCollection(MongoDatabase database) {
super(database.getCollection(LotteryCollection.NAME));
}
private IndexInitializer(final MongoDatabase db, final Materializer materializer) {
this.materializer = materializer;
this.indexOperations = IndexOperations.of(db);
}
private IndexOperations(final MongoDatabase db) {
this.db = db;
}
private MongoNamespacePersistenceOperations(final MongoDatabase db,
final MongoEventSourceSettings eventSourceSettings) {
this.db = checkNotNull(db, "database");
selectionProvider = MongoPersistenceOperationsSelectionProvider.of(eventSourceSettings);
}
private MongoEntitiesPersistenceOperations(final MongoDatabase db,
final MongoEventSourceSettings eventSourceSettings) {
this.db = requireNonNull(db);
requireNonNull(eventSourceSettings);
this.selectionProvider = MongoPersistenceOperationsSelectionProvider.of(eventSourceSettings);
}