下面列出了怎么用com.mongodb.reactivestreams.client.MongoCollection的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Publisher<NewsLetter> createPublisher(long elements) {
MongoCollection<News> collection = mongoClient().getDatabase("news")
.getCollection("news", News.class);
int period = elements > 0 ? (int)(1000 / elements) : 1;
prepareItemsInDatabase(elements);
Publisher<NewsLetter> newsServicePublisher = new NewsServicePublisher(smp ->
new ScheduledPublisher<>(
() -> new NewsPreparationOperator(
new DBPublisher(
collection,
"tech"
),
"Some Digest"
),
period == 0 ? 1 : period, TimeUnit.MILLISECONDS
).subscribe(smp)
);
newsServicePublisher = Flowable.fromPublisher(newsServicePublisher)
.take(elements);
return newsServicePublisher;
}
@Override
public Publisher<NewsLetter> createFailedPublisher() {
MongoCollection<News> collection = mongoClient().getDatabase("news")
.getCollection("news",
News.class);
WithEmbeddedMongo.tearDownMongo();
return new NewsServicePublisher(smp ->
new ScheduledPublisher<>(
() -> new NewsPreparationOperator(
new DBPublisher(
collection,
"tech"
),
"Some Digest"
),
1, TimeUnit.MILLISECONDS
).subscribe(smp)
);
}
private void prepareItemsInDatabase(long elements) {
if (elements <= 0) {
return;
}
MongoCollection<News> collection = mongoClient().getDatabase("news")
.getCollection("news", News.class);
Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
.ignoreElements()
.andThen(Flowable.rangeLong(0L,
elements)
.map(l -> NewsHarness.generate())
.buffer(500,
TimeUnit.MILLISECONDS)
.flatMap(collection::insertMany));
if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
successFlowable.subscribe();
}
else {
successFlowable.blockingSubscribe();
}
}
private static Source<Optional<Throwable>, NotUsed> doDeleteByFilter(final MongoCollection<Document> collection,
final Bson filter) {
// https://stackoverflow.com/a/33164008
// claims unordered bulk ops halve MongoDB load
final List<WriteModel<Document>> writeModel =
Collections.singletonList(new DeleteManyModel<>(filter));
final BulkWriteOptions options = new BulkWriteOptions().ordered(false);
return Source.fromPublisher(collection.bulkWrite(writeModel, options))
.map(result -> {
if (LOGGER.isDebugEnabled()) {
// in contrast to Bson, BsonDocument has meaningful toString()
final BsonDocument filterBsonDoc = BsonUtil.toBsonDocument(filter);
LOGGER.debug("Deleted <{}> documents from collection <{}>. Filter was <{}>.",
result.getDeletedCount(), collection.getNamespace(), filterBsonDoc);
}
return Optional.<Throwable>empty();
})
.recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
.matchAny(throwable -> Source.single(Optional.of(throwable)))
.build());
}
/**
* Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
*
* @param database The database to use.
* @param collectionName The name of the capped collection that should be created.
* @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
* @param materializer The actor materializer to pre-materialize the restart source.
* @return Returns the created or retrieved collection.
*/
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
final MongoDatabase database,
final String collectionName,
final long cappedCollectionSizeInBytes,
final ActorMaterializer materializer) {
final Source<Success, NotUsed> createCollectionSource =
repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);
final Source<MongoCollection, NotUsed> infiniteCollectionSource =
createCollectionSource.map(success -> database.getCollection(collectionName))
.flatMapConcat(Source::repeat);
final Source<MongoCollection, NotUsed> restartSource =
RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);
// pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
// until the stream fails, whereupon it gets recreated with backoff.
return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
@Test
public void mustNotOverwriteExistingDocuments() throws JsonProcessingException {
String employeeCollection = RandomStringUtils.randomAlphabetic(10);
String orgArchiveColl = RandomStringUtils.randomAlphabetic(10);
List<Employee> originalEmployees = addEmployeeDocuments(employeeCollection, EMPLOYEE_DOCS);
MongoCollection<Document> employeeColl = mongoTemplate.getCollection(employeeCollection);
validateCount(employeeCollection, originalEmployees.size());
List<OrgArchiveEntry> orgArchiveEntries = addOrgArchiveEntries(orgArchiveColl, ORG_ARCHIVE_DOCS);
MongoCollection<Document> orgArchiveCollection = mongoTemplate.getCollection(orgArchiveColl);
validateCount(orgArchiveColl, orgArchiveEntries.size());
Document index = Document.parse("{'fiscalYear': 1, 'dept': 1}");
IndexDefinition def = new CompoundIndexDefinition(index).unique();
mongoTemplate.indexOps(orgArchiveColl).ensureIndex(def).block();
zooEmployeeRepository.updateOrgArchiveInsertOnly(employeeCollection, orgArchiveColl).block();
validateCount(orgArchiveColl, orgArchiveEntries.size() + 2);
Query query = new Query(Criteria.where("fiscalYear").is(2019));
List<OrgArchiveEntry> newArchiveEntries = mongoTemplate.find(query, OrgArchiveEntry.class, orgArchiveColl)
.collectList().block();
assertNotNull(newArchiveEntries);
assertEquals(newArchiveEntries.size(), 2);
}
@BeforeMethod
public void setUp() {
map = TestHelper.createMap(true);
mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
mockMongoClient = mock(MongoClient.class);
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
mockPublisher = mock(Publisher.class);
sink = new MongoSink(() -> mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
when(mockMongoDb.getCollection(anyString()).insertMany(any())).thenReturn(mockPublisher);
}
@BeforeMethod
public void setUp() {
map = TestHelper.createMap(true);
mockSourceContext = mock(SourceContext.class);
mockMongoClient = mock(MongoClient.class);
mockMongoDb = mock(MongoDatabase.class);
mockMongoColl = mock(MongoCollection.class);
mockPublisher = mock(ChangeStreamPublisher.class);
source = new MongoSource(() -> mockMongoClient);
when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
when(mockMongoColl.watch()).thenReturn(mockPublisher);
when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);
doAnswer((invocation) -> {
subscriber = invocation.getArgument(0, Subscriber.class);
return null;
}).when(mockPublisher).subscribe(any());
}
@Before
public void setUp() {
Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
.size(1024 * 1024) //
.maxDocuments(100) //
.capped()));
StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
repository.saveAll(Flowable.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27))) //
.test() //
.awaitCount(4) //
.assertNoErrors() //
.awaitTerminalEvent();
}
@Before
public void setUp() {
Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
.size(1024 * 1024) //
.maxDocuments(100) //
.capped()));
StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();
Flux<Person> insertAll = operations.insertAll(Flux.just(new Person("Walter", "White", 50), //
new Person("Skyler", "White", 45), //
new Person("Saul", "Goodman", 42), //
new Person("Jesse", "Pinkman", 27)).collectList());
StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
}
/**
* Test that {@code _id} attribute is persisted instead of {@code id}
*/
@Test
void idAttribute() {
StringHolderRepository repository = new StringHolderRepository(resource.backend());
ImmutableStringHolder holder = TypeHolder.StringHolder.generator().get().withId("id1");
repository.insertAll(Arrays.asList(holder, holder.withId("id2")));
MongoCollection<BsonDocument> collection = resource.collection(TypeHolder.StringHolder.class)
.withDocumentClass(BsonDocument.class);
List<BsonDocument> docs = Flowable.fromPublisher(collection.find()).toList().blockingGet();
Checkers.check(docs).hasSize(2);
// has _id attribute
Checkers.check(docs.stream().map(BsonDocument::keySet).flatMap(Collection::stream).collect(Collectors.toSet())).has("_id");
// does not have 'id' attribute only '_id' (with underscore which is mongo specific) in collected documents
Checkers.check(docs.stream().map(BsonDocument::keySet).flatMap(Collection::stream).collect(Collectors.toSet())).not().has("id");
Checkers.check(docs.stream().map(d -> d.get("_id").asString().getValue()).collect(Collectors.toList())).hasContentInAnyOrder("id1", "id2");
// using repository
Checkers.check(repository.findAll().fetch().stream().map(TypeHolder.StringHolder::id).collect(Collectors.toList())).hasContentInAnyOrder("id1", "id2");
}
private Source<List<Throwable>, NotUsed> purge(final MongoPersistenceOperationsSelection selection) {
final MongoCollection<Document> collection = db.getCollection(selection.getCollectionName());
if (selection.isEntireCollection()) {
return MongoOpsUtil.drop(collection);
} else {
return MongoOpsUtil.deleteByFilter(collection, selection.getFilter());
}
}
private static Source<Optional<Throwable>, NotUsed> doDrop(final MongoCollection<Document> collection) {
return Source.fromPublisher(collection.drop())
.map(result -> {
LOGGER.debug("Successfully dropped collection <{}>.", collection.getNamespace());
return Optional.<Throwable>empty();
})
.recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
.matchAny(throwable -> Source.single(Optional.of(throwable)))
.build());
}
/**
* Creates a new initialized instance.
*
* @param collectionName The name of the collection.
* @param mongoClient the client wrapper holding the connection information.
* @param materializer an actor materializer to materialize the restart-source of the timestamp collection.
* @return a new initialized instance.
*/
public static MongoTimestampPersistence initializedInstance(final String collectionName,
final DittoMongoClient mongoClient, final ActorMaterializer materializer) {
final Source<MongoCollection, NotUsed> collectionSource =
createOrGetCappedCollection(mongoClient.getDefaultDatabase(), collectionName,
MIN_CAPPED_COLLECTION_SIZE_IN_BYTES, materializer);
return new MongoTimestampPersistence(collectionSource);
}
private Source<List<String>, NotUsed> listPidsInJournal(final MongoCollection<Document> journal,
final String lowerBound, final int batchSize, final ActorMaterializer mat, final Duration maxBackOff,
final int maxRestarts) {
return unfoldBatchedSource(lowerBound, mat, Function.identity(), actualStart ->
listJournalPidsAbove(journal, actualStart, batchSize, maxBackOff, maxRestarts)
);
}
private Source<List<Document>, NotUsed> listNewestSnapshots(final MongoCollection<Document> snapshotStore,
final String lowerBound,
final int batchSize,
final ActorMaterializer mat,
final String... snapshotFields) {
return this.unfoldBatchedSource(lowerBound,
mat,
SnapshotBatch::getMaxPid,
actualStart -> listNewestActiveSnapshotsByBatch(snapshotStore, actualStart, batchSize, snapshotFields))
.mapConcat(x -> x)
.map(SnapshotBatch::getItems);
}
private Source<String, NotUsed> listJournalPidsAbove(final MongoCollection<Document> journal, final String start,
final int batchSize, final Duration maxBackOff, final int maxRestarts) {
final List<Bson> pipeline = new ArrayList<>(5);
// optional match stage
if (!start.isEmpty()) {
pipeline.add(Aggregates.match(Filters.gt(PROCESSOR_ID, start)));
}
// sort stage
pipeline.add(Aggregates.sort(Sorts.ascending(PROCESSOR_ID)));
// limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
pipeline.add(Aggregates.limit(batchSize));
// group stage
pipeline.add(Aggregates.group("$" + PROCESSOR_ID));
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(ID)));
final Duration minBackOff = Duration.ofSeconds(1L);
final double randomFactor = 0.1;
return RestartSource.onFailuresWithBackoff(minBackOff, maxBackOff, randomFactor, maxRestarts,
() -> Source.fromPublisher(journal.aggregate(pipeline))
.filter(document -> document.containsKey(ID))
.map(document -> document.getString(ID)));
}
@Test
public void ensureCollectionIsCapped() throws Exception {
final MongoCollection<Document> collection =
syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();
runBlocking(syncPersistence.setTimestamp(Instant.now()));
runBlocking(syncPersistence.setTimestamp(Instant.now()));
assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}
private MongoThingsSearchPersistence(
final MongoCollection<Document> collection,
final LoggingAdapter log,
final IndexInitializer indexInitializer,
final Duration maxQueryTime,
final MongoHints hints) {
this.collection = collection;
this.log = log;
this.indexInitializer = indexInitializer;
this.maxQueryTime = maxQueryTime;
this.hints = hints;
}
private void dropCollectionWithBackoff(final MongoCollection<Document> collection) {
RuntimeException lastException = null;
for (int i = 0; i < 20; ++i) {
try {
waitFor(Source.fromPublisher(collection.drop()));
return;
} catch (final RuntimeException e) {
lastException = e;
backoff();
}
}
throw lastException;
}
private Maybe<Document> findUserByUsername(String username) {
MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
String jsonQuery = convertToJsonString(rawQuery);
BsonDocument query = BsonDocument.parse(jsonQuery);
return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
}
@Override
public void afterPropertiesSet() throws Exception {
Observable.fromPublisher(mongoDatabase.createCollection("users")).blockingFirst();
MongoCollection<Document> collection = mongoDatabase.getCollection("users");
Document doc = new Document("username", "bob").append("password", "bobspassword");
Observable.fromPublisher(collection.insertOne(doc)).blockingFirst();
}
MongoSession(MongoCollection<?> collection, KeyExtractor keyExtractor) {
this.collection = Objects.requireNonNull(collection, "collection");
this.keyExtractor = Objects.requireNonNull(keyExtractor, "keyExtractor");
PathNaming pathNaming = PathNaming.defaultNaming();
KeyExtractor.KeyMetadata metadata = keyExtractor.metadata();
if (metadata.isKeyDefined() && metadata.isExpression() && metadata.keys().size() == 1) {
Path idProperty = Visitors.toPath(Iterables.getOnlyElement(metadata.keys()));
pathNaming = new MongoPathNaming(idProperty, pathNaming);
}
this.pathNaming = pathNaming;
this.converter = Mongos.converter(this.pathNaming, collection.getCodecRegistry());
}
/**
* Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
* with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
* object to BsonDocument to extract {@code _id} attribute.
*/
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
ReplaceOptions options = new ReplaceOptions();
if (operation.upsert()) {
options.upsert(operation.upsert());
}
List<ReplaceOneModel<Object>> docs = operation.values().stream()
.map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
.collect(Collectors.toList());
Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}
private <X> Publisher<X> watch(StandardOperations.Watch operation) {
final MongoCollection<X> collection = (MongoCollection<X>) this.collection;
final Bson filter = new Document("fullDocument", toBsonFilter(operation.query()));
return Flowable.fromPublisher(collection.watch(Collections.singletonList(filter))
.fullDocument(FullDocument.UPDATE_LOOKUP)
.withDocumentClass(collection.getDocumentClass()));
}
@Override
public MongoCollection<?> resolve(Class<?> entityClass) {
final String name = ContainerNaming.DEFAULT.name(entityClass);
final MongoCollection<?> collection;
// already exists ?
if (!Flowable.fromPublisher(database.listCollectionNames()).toList().blockingGet().contains(name)) {
Success success = Flowable.fromPublisher(database.createCollection(name)).blockingFirst();
}
return database.getCollection(name).withDocumentClass(entityClass).withCodecRegistry(registry);
}
private MongoCollection<Product> getCollection() {
return mongoClient
.getDatabase("products-demo")
.getCollection("product", Product.class);
}
public DBPublisher(MongoCollection<News> collection, String category) {
this.collection = collection;
this.category = category;
}
ReactiveMongoCollectionImpl(MongoCollection<T> collection) {
this.collection = collection;
}
protected DatabaseCollection(MongoCollection<Document> collection) {
this.collection = collection;
}