类com.mongodb.reactivestreams.client.MongoCollection源码实例Demo

下面列出了怎么用com.mongodb.reactivestreams.client.MongoCollection的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public Publisher<NewsLetter> createPublisher(long elements) {
    MongoCollection<News> collection = mongoClient().getDatabase("news")
                                                    .getCollection("news", News.class);
    int period = elements > 0 ? (int)(1000 / elements) : 1;
    prepareItemsInDatabase(elements);

    Publisher<NewsLetter> newsServicePublisher = new NewsServicePublisher(smp ->
            new ScheduledPublisher<>(
                    () -> new NewsPreparationOperator(
                            new DBPublisher(
                                    collection,
                                    "tech"
                            ),
                            "Some Digest"
                    ),
                    period == 0 ? 1 : period, TimeUnit.MILLISECONDS
            ).subscribe(smp)
    );

    newsServicePublisher = Flowable.fromPublisher(newsServicePublisher)
                                   .take(elements);

    return newsServicePublisher;
}
 
@Override
public Publisher<NewsLetter> createFailedPublisher() {
    MongoCollection<News> collection = mongoClient().getDatabase("news")
                                                    .getCollection("news",
                                                            News.class);
    WithEmbeddedMongo.tearDownMongo();
    return new NewsServicePublisher(smp ->
            new ScheduledPublisher<>(
                    () -> new NewsPreparationOperator(
                            new DBPublisher(
                                    collection,
                                    "tech"
                            ),
                            "Some Digest"
                    ),
                    1, TimeUnit.MILLISECONDS
            ).subscribe(smp)
    );
}
 
private void prepareItemsInDatabase(long elements) {
    if (elements <= 0) {
        return;
    }

    MongoCollection<News> collection = mongoClient().getDatabase("news")
                                                    .getCollection("news", News.class);

    Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
                                                .ignoreElements()
                                                .andThen(Flowable.rangeLong(0L,
                                                        elements)
                                                                 .map(l -> NewsHarness.generate())
                                                                 .buffer(500,
                                                                         TimeUnit.MILLISECONDS)
                                                                 .flatMap(collection::insertMany));

    if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
        successFlowable.subscribe();
    }
    else {
        successFlowable.blockingSubscribe();
    }
}
 
源代码4 项目: ditto   文件: MongoOpsUtil.java
private static Source<Optional<Throwable>, NotUsed> doDeleteByFilter(final MongoCollection<Document> collection,
        final Bson filter) {
    // https://stackoverflow.com/a/33164008
    // claims unordered bulk ops halve MongoDB load
    final List<WriteModel<Document>> writeModel =
            Collections.singletonList(new DeleteManyModel<>(filter));
    final BulkWriteOptions options = new BulkWriteOptions().ordered(false);
    return Source.fromPublisher(collection.bulkWrite(writeModel, options))
            .map(result -> {
                if (LOGGER.isDebugEnabled()) {
                    // in contrast to Bson, BsonDocument has meaningful toString()
                    final BsonDocument filterBsonDoc = BsonUtil.toBsonDocument(filter);
                    LOGGER.debug("Deleted <{}> documents from collection <{}>. Filter was <{}>.",
                            result.getDeletedCount(), collection.getNamespace(), filterBsonDoc);
                }
                return Optional.<Throwable>empty();
            })
            .recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
                    .matchAny(throwable -> Source.single(Optional.of(throwable)))
                    .build());
}
 
源代码5 项目: ditto   文件: MongoTimestampPersistence.java
/**
 * Creates the capped collection {@code collectionName} using {@code clientWrapper} if it doesn't exists yet.
 *
 * @param database The database to use.
 * @param collectionName The name of the capped collection that should be created.
 * @param cappedCollectionSizeInBytes The size in bytes of the collection that should be created.
 * @param materializer The actor materializer to pre-materialize the restart source.
 * @return Returns the created or retrieved collection.
 */
private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(
        final MongoDatabase database,
        final String collectionName,
        final long cappedCollectionSizeInBytes,
        final ActorMaterializer materializer) {

    final Source<Success, NotUsed> createCollectionSource =
            repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);

    final Source<MongoCollection, NotUsed> infiniteCollectionSource =
            createCollectionSource.map(success -> database.getCollection(collectionName))
                    .flatMapConcat(Source::repeat);

    final Source<MongoCollection, NotUsed> restartSource =
            RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0, () -> infiniteCollectionSource);

    // pre-materialize source with BroadcastHub so that a successfully obtained capped collection is reused
    // until the stream fails, whereupon it gets recreated with backoff.
    return restartSource.runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
}
 
@Test
public void mustNotOverwriteExistingDocuments() throws JsonProcessingException {
  String employeeCollection = RandomStringUtils.randomAlphabetic(10);
  String orgArchiveColl = RandomStringUtils.randomAlphabetic(10);
  List<Employee> originalEmployees = addEmployeeDocuments(employeeCollection, EMPLOYEE_DOCS);
  MongoCollection<Document> employeeColl = mongoTemplate.getCollection(employeeCollection);
  validateCount(employeeCollection, originalEmployees.size());
  List<OrgArchiveEntry> orgArchiveEntries = addOrgArchiveEntries(orgArchiveColl, ORG_ARCHIVE_DOCS);
  MongoCollection<Document> orgArchiveCollection = mongoTemplate.getCollection(orgArchiveColl);
  validateCount(orgArchiveColl, orgArchiveEntries.size());
  Document index = Document.parse("{'fiscalYear': 1, 'dept': 1}");
  IndexDefinition def = new CompoundIndexDefinition(index).unique();
  mongoTemplate.indexOps(orgArchiveColl).ensureIndex(def).block();
  zooEmployeeRepository.updateOrgArchiveInsertOnly(employeeCollection, orgArchiveColl).block();
  validateCount(orgArchiveColl, orgArchiveEntries.size() + 2);
  Query query = new Query(Criteria.where("fiscalYear").is(2019));
  List<OrgArchiveEntry> newArchiveEntries = mongoTemplate.find(query, OrgArchiveEntry.class, orgArchiveColl)
                                                         .collectList().block();
  assertNotNull(newArchiveEntries);
  assertEquals(newArchiveEntries.size(), 2);
}
 
源代码7 项目: pulsar   文件: MongoSinkTest.java
@BeforeMethod
public void setUp() {

    map = TestHelper.createMap(true);

    mockRecord = mock(Record.class);
    mockSinkContext = mock(SinkContext.class);
    mockMongoClient = mock(MongoClient.class);
    mockMongoDb = mock(MongoDatabase.class);
    mockMongoColl = mock(MongoCollection.class);
    mockPublisher = mock(Publisher.class);
    sink = new MongoSink(() -> mockMongoClient);


    when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
    when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
    when(mockMongoDb.getCollection(anyString()).insertMany(any())).thenReturn(mockPublisher);
}
 
源代码8 项目: pulsar   文件: MongoSourceTest.java
@BeforeMethod
public void setUp() {

    map = TestHelper.createMap(true);

    mockSourceContext = mock(SourceContext.class);
    mockMongoClient = mock(MongoClient.class);
    mockMongoDb = mock(MongoDatabase.class);
    mockMongoColl = mock(MongoCollection.class);
    mockPublisher = mock(ChangeStreamPublisher.class);

    source = new MongoSource(() -> mockMongoClient);

    when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
    when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
    when(mockMongoColl.watch()).thenReturn(mockPublisher);
    when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
    when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);

    doAnswer((invocation) -> {
        subscriber = invocation.getArgument(0, Subscriber.class);
        return null;
    }).when(mockPublisher).subscribe(any());
}
 
@Before
public void setUp() {

	Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
			.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
			.then(operations.createCollection(Person.class, CollectionOptions.empty() //
					.size(1024 * 1024) //
					.maxDocuments(100) //
					.capped()));

	StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();

	repository.saveAll(Flowable.just(new Person("Walter", "White", 50), //
			new Person("Skyler", "White", 45), //
			new Person("Saul", "Goodman", 42), //
			new Person("Jesse", "Pinkman", 27))) //
			.test() //
			.awaitCount(4) //
			.assertNoErrors() //
			.awaitTerminalEvent();
}
 
@Before
public void setUp() {

	Mono<MongoCollection<Document>> recreateCollection = operations.collectionExists(Person.class) //
			.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
			.then(operations.createCollection(Person.class, CollectionOptions.empty() //
					.size(1024 * 1024) //
					.maxDocuments(100) //
					.capped()));

	StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete();

	Flux<Person> insertAll = operations.insertAll(Flux.just(new Person("Walter", "White", 50), //
					new Person("Skyler", "White", 45), //
					new Person("Saul", "Goodman", 42), //
			new Person("Jesse", "Pinkman", 27)).collectList());

	StepVerifier.create(insertAll).expectNextCount(4).verifyComplete();
}
 
源代码11 项目: immutables   文件: IdAttributeTest.java
/**
 * Test that {@code _id} attribute is persisted instead of {@code id}
 */
@Test
void idAttribute() {
  StringHolderRepository repository = new StringHolderRepository(resource.backend());
  ImmutableStringHolder holder = TypeHolder.StringHolder.generator().get().withId("id1");
  repository.insertAll(Arrays.asList(holder, holder.withId("id2")));

  MongoCollection<BsonDocument> collection = resource.collection(TypeHolder.StringHolder.class)
          .withDocumentClass(BsonDocument.class);


  List<BsonDocument> docs = Flowable.fromPublisher(collection.find()).toList().blockingGet();

  Checkers.check(docs).hasSize(2);

  // has _id attribute
  Checkers.check(docs.stream().map(BsonDocument::keySet).flatMap(Collection::stream).collect(Collectors.toSet())).has("_id");
  // does not have 'id' attribute only '_id' (with underscore which is mongo specific) in collected documents
  Checkers.check(docs.stream().map(BsonDocument::keySet).flatMap(Collection::stream).collect(Collectors.toSet())).not().has("id");
  Checkers.check(docs.stream().map(d -> d.get("_id").asString().getValue()).collect(Collectors.toList())).hasContentInAnyOrder("id1", "id2");

  // using repository
  Checkers.check(repository.findAll().fetch().stream().map(TypeHolder.StringHolder::id).collect(Collectors.toList())).hasContentInAnyOrder("id1", "id2");
}
 
private Source<List<Throwable>, NotUsed> purge(final MongoPersistenceOperationsSelection selection) {
    final MongoCollection<Document> collection = db.getCollection(selection.getCollectionName());
    if (selection.isEntireCollection()) {
        return MongoOpsUtil.drop(collection);
    } else {
        return MongoOpsUtil.deleteByFilter(collection, selection.getFilter());
    }
}
 
源代码13 项目: ditto   文件: MongoOpsUtil.java
private static Source<Optional<Throwable>, NotUsed> doDrop(final MongoCollection<Document> collection) {
    return Source.fromPublisher(collection.drop())
            .map(result -> {
                LOGGER.debug("Successfully dropped collection <{}>.", collection.getNamespace());
                return Optional.<Throwable>empty();
            })
            .recoverWithRetries(RETRY_ATTEMPTS, new PFBuilder<Throwable, Source<Optional<Throwable>, NotUsed>>()
                    .matchAny(throwable -> Source.single(Optional.of(throwable)))
                    .build());
}
 
源代码14 项目: ditto   文件: MongoTimestampPersistence.java
/**
 * Creates a new initialized instance.
 *
 * @param collectionName The name of the collection.
 * @param mongoClient the client wrapper holding the connection information.
 * @param materializer an actor materializer to materialize the restart-source of the timestamp collection.
 * @return a new initialized instance.
 */
public static MongoTimestampPersistence initializedInstance(final String collectionName,
        final DittoMongoClient mongoClient, final ActorMaterializer materializer) {
    final Source<MongoCollection, NotUsed> collectionSource =
            createOrGetCappedCollection(mongoClient.getDefaultDatabase(), collectionName,
                    MIN_CAPPED_COLLECTION_SIZE_IN_BYTES, materializer);

    return new MongoTimestampPersistence(collectionSource);
}
 
源代码15 项目: ditto   文件: MongoReadJournal.java
private Source<List<String>, NotUsed> listPidsInJournal(final MongoCollection<Document> journal,
        final String lowerBound, final int batchSize, final ActorMaterializer mat, final Duration maxBackOff,
        final int maxRestarts) {

    return unfoldBatchedSource(lowerBound, mat, Function.identity(), actualStart ->
            listJournalPidsAbove(journal, actualStart, batchSize, maxBackOff, maxRestarts)
    );
}
 
源代码16 项目: ditto   文件: MongoReadJournal.java
private Source<List<Document>, NotUsed> listNewestSnapshots(final MongoCollection<Document> snapshotStore,
        final String lowerBound,
        final int batchSize,
        final ActorMaterializer mat,
        final String... snapshotFields) {

    return this.unfoldBatchedSource(lowerBound,
            mat,
            SnapshotBatch::getMaxPid,
            actualStart -> listNewestActiveSnapshotsByBatch(snapshotStore, actualStart, batchSize, snapshotFields))
            .mapConcat(x -> x)
            .map(SnapshotBatch::getItems);
}
 
源代码17 项目: ditto   文件: MongoReadJournal.java
private Source<String, NotUsed> listJournalPidsAbove(final MongoCollection<Document> journal, final String start,
        final int batchSize, final Duration maxBackOff, final int maxRestarts) {

    final List<Bson> pipeline = new ArrayList<>(5);
    // optional match stage
    if (!start.isEmpty()) {
        pipeline.add(Aggregates.match(Filters.gt(PROCESSOR_ID, start)));
    }

    // sort stage
    pipeline.add(Aggregates.sort(Sorts.ascending(PROCESSOR_ID)));

    // limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
    pipeline.add(Aggregates.limit(batchSize));

    // group stage
    pipeline.add(Aggregates.group("$" + PROCESSOR_ID));

    // sort stage 2 -- order after group stage is not defined
    pipeline.add(Aggregates.sort(Sorts.ascending(ID)));

    final Duration minBackOff = Duration.ofSeconds(1L);
    final double randomFactor = 0.1;

    return RestartSource.onFailuresWithBackoff(minBackOff, maxBackOff, randomFactor, maxRestarts,
            () -> Source.fromPublisher(journal.aggregate(pipeline))
                    .filter(document -> document.containsKey(ID))
                    .map(document -> document.getString(ID)));
}
 
源代码18 项目: ditto   文件: MongoTimestampPersistenceIT.java
@Test
public void ensureCollectionIsCapped() throws Exception {
    final MongoCollection<Document> collection =
            syncPersistence.getCollection().runWith(Sink.head(), materializer).toCompletableFuture().get();

    runBlocking(syncPersistence.setTimestamp(Instant.now()));
    runBlocking(syncPersistence.setTimestamp(Instant.now()));

    assertThat(runBlocking(Source.fromPublisher(collection.count()))).containsExactly(1L);
}
 
源代码19 项目: ditto   文件: MongoThingsSearchPersistence.java
private MongoThingsSearchPersistence(
        final MongoCollection<Document> collection,
        final LoggingAdapter log,
        final IndexInitializer indexInitializer,
        final Duration maxQueryTime,
        final MongoHints hints) {

    this.collection = collection;
    this.log = log;
    this.indexInitializer = indexInitializer;
    this.maxQueryTime = maxQueryTime;
    this.hints = hints;
}
 
private void dropCollectionWithBackoff(final MongoCollection<Document> collection) {
    RuntimeException lastException = null;
    for (int i = 0; i < 20; ++i) {
        try {
            waitFor(Source.fromPublisher(collection.drop()));
            return;
        } catch (final RuntimeException e) {
            lastException = e;
            backoff();
        }
    }
    throw lastException;
}
 
private Maybe<Document> findUserByUsername(String username) {
    MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
    String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
    String jsonQuery = convertToJsonString(rawQuery);
    BsonDocument query = BsonDocument.parse(jsonQuery);
    return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
}
 
@Override
public void afterPropertiesSet() throws Exception {
    Observable.fromPublisher(mongoDatabase.createCollection("users")).blockingFirst();
    MongoCollection<Document> collection = mongoDatabase.getCollection("users");
    Document doc = new Document("username", "bob").append("password", "bobspassword");
    Observable.fromPublisher(collection.insertOne(doc)).blockingFirst();
}
 
源代码23 项目: immutables   文件: MongoSession.java
MongoSession(MongoCollection<?> collection, KeyExtractor keyExtractor) {
  this.collection = Objects.requireNonNull(collection, "collection");
  this.keyExtractor = Objects.requireNonNull(keyExtractor, "keyExtractor");

  PathNaming pathNaming = PathNaming.defaultNaming();
  KeyExtractor.KeyMetadata metadata = keyExtractor.metadata();
  if (metadata.isKeyDefined() && metadata.isExpression() && metadata.keys().size() == 1) {
    Path idProperty = Visitors.toPath(Iterables.getOnlyElement(metadata.keys()));
    pathNaming = new MongoPathNaming(idProperty, pathNaming);
  }
  this.pathNaming = pathNaming;
  this.converter = Mongos.converter(this.pathNaming, collection.getCodecRegistry());
}
 
源代码24 项目: immutables   文件: MongoSession.java
/**
 * Uses <a href="https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/">replaceOne</a> operation
 * with <a href="https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/">bulkWrite</a>. Right now has to convert
 * object to BsonDocument to extract {@code _id} attribute.
 */
private <T> Publisher<WriteResult> update(StandardOperations.Update operation) {
  ReplaceOptions options = new ReplaceOptions();
  if (operation.upsert()) {
    options.upsert(operation.upsert());
  }

  List<ReplaceOneModel<Object>> docs =  operation.values().stream()
          .map(value -> new ReplaceOneModel<>(new BsonDocument(Mongos.ID_FIELD_NAME, toBsonValue(keyExtractor.extract(value))), value, options))
          .collect(Collectors.toList());

  Publisher<BulkWriteResult> publisher = ((MongoCollection<Object>) collection).bulkWrite(docs);
  return Flowable.fromPublisher(publisher).map(x -> WriteResult.unknown());
}
 
源代码25 项目: immutables   文件: MongoSession.java
private <X> Publisher<X> watch(StandardOperations.Watch operation) {
  final MongoCollection<X> collection = (MongoCollection<X>) this.collection;
  final Bson filter = new Document("fullDocument", toBsonFilter(operation.query()));
  return Flowable.fromPublisher(collection.watch(Collections.singletonList(filter))
          .fullDocument(FullDocument.UPDATE_LOOKUP)
          .withDocumentClass(collection.getDocumentClass()));

}
 
源代码26 项目: immutables   文件: BackendResource.java
@Override
public MongoCollection<?> resolve(Class<?> entityClass) {
  final String name = ContainerNaming.DEFAULT.name(entityClass);
  final MongoCollection<?> collection;
  // already exists ?
  if (!Flowable.fromPublisher(database.listCollectionNames()).toList().blockingGet().contains(name)) {
    Success success = Flowable.fromPublisher(database.createCollection(name)).blockingFirst();
  }

  return database.getCollection(name).withDocumentClass(entityClass).withCodecRegistry(registry);
}
 
private MongoCollection<Product> getCollection() {
    return mongoClient
            .getDatabase("products-demo")
            .getCollection("product", Product.class);
}
 
public DBPublisher(MongoCollection<News> collection, String category) {
    this.collection = collection;
 this.category = category;
}
 
源代码29 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
ReactiveMongoCollectionImpl(MongoCollection<T> collection) {
    this.collection = collection;
}
 
源代码30 项目: Shadbot   文件: DatabaseCollection.java
protected DatabaseCollection(MongoCollection<Document> collection) {
    this.collection = collection;
}
 
 类所在包
 类方法
 同包方法