类com.mongodb.reactivestreams.client.AggregatePublisher源码实例Demo

下面列出了怎么用com.mongodb.reactivestreams.client.AggregatePublisher的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: quarkus   文件: AggregateOptions.java
public <T> AggregatePublisher<T> apply(AggregatePublisher<T> stream) {
    AggregatePublisher<T> publisher = stream;

    if (hint != null) {
        publisher = publisher.hint(hint);
    }
    if (comment != null) {
        publisher = publisher.comment(comment);
    }
    if (collation != null) {
        publisher = publisher.collation(collation);
    }
    publisher.bypassDocumentValidation(bypassDocumentValidation);
    publisher.allowDiskUse(allowDiskUse);
    if (maxAwaitTime > 0) {
        publisher.maxAwaitTime(maxAwaitTime, maxAwaitTimeUnit);
    }
    if (maxTime > 0) {
        publisher.maxAwaitTime(maxTime, maxTimeUnit);
    }
    return publisher;
}
 
源代码2 项目: ditto   文件: MongoThingsSearchPersistence.java
@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
    final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
            Collections.singletonList(
                    new Document("$group",
                            new Document(FIELD_ID, "$_namespace")
                                    .append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
                    )
            )
    );

    return Source.fromPublisher(aggregatePublisher)
            .map(document -> {
                final String namespace = document.get(FIELD_ID) != null
                        ? document.get(FIELD_ID).toString()
                        : "NOT_MIGRATED";
                final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
                return new SearchNamespaceResultEntry(namespace, count);
            })
            .fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
                list.add(entry);
                return list;
            })
            .map(SearchNamespaceReportResult::new);
}
 
源代码3 项目: quarkus   文件: ReactiveMongoDatabaseImpl.java
private <T> AggregatePublisher<T> apply(AggregateOptions options, AggregatePublisher<T> aggregate) {
    if (options == null) {
        return aggregate;
    }
    return options.apply(aggregate);
}
 
源代码4 项目: quarkus   文件: ReactiveMongoCollectionImpl.java
private <D> AggregatePublisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) {
    if (options == null) {
        return publisher;
    }
    return options.apply(publisher);
}
 
@Override
public Object executeQuery(QueryProvider queryProvider) {

  // convert the pipelines by parsing the JSON strings
  Iterator iterator = queryProvider.getPipelines().iterator();
  int i = 0;

  String collectionName = queryProvider.getCollectionName();
  List<Bson> pipelineStages = new ArrayList<>();
  while (iterator.hasNext()) {
    String query = (String) iterator.next();
    LOGGER.trace("Processing query string {} for pipeline stage {}", query, i++);
    Bson document = BsonDocument.parse(query);
    pipelineStages.add(document);
  }
  // run the pipeline and return a flux.
  MongoCollection<Document> collection = mongoOperations.getCollection(collectionName);
  AggregatePublisher<Document> aggregatePublisher = collection.aggregate(pipelineStages)
                                                              .allowDiskUse(queryProvider.isAllowDiskUse())
                                                              .maxTime(queryProvider.getMaxTimeMS(), MILLISECONDS);
  Class methodReturnType = queryProvider.getMethodReturnType();
  boolean isFlux = Flux.class.isAssignableFrom(methodReturnType);
  boolean isMono = Mono.class.isAssignableFrom(methodReturnType);
  boolean isFluxOrMono = isFlux || isMono;
  if (!isFluxOrMono) {
    throw new IllegalArgumentException("Method return type must be of Flux or Mono type");
  }
  Class<?> outputClass = queryProvider.getOutputClass();
  if (isFlux) {
    LOGGER.trace("Return type is Flux<{}>", outputClass);
    Flux<Document> retval = Flux.from(aggregatePublisher);
    if (outputClass != null) {
      return adaptPipeline(queryProvider, outputClass, retval);
    }
    return retval;
  }
  else {
    Mono<Document> mono = Mono.from(aggregatePublisher);
    if (outputClass != null) {
      LOGGER.trace("Return type is Mono<{}>", outputClass);
      return adaptPipeline(queryProvider, outputClass, mono);
    }
    return mono;
  }
}
 
@Override
public AggregatePublisher<Document> aggregate(final List<? extends Bson> pipeline) {
    return aggregate(pipeline, Document.class);
}
 
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final List<? extends Bson> pipeline, final Class<TResult> clazz) {
    return new AggregatePublisherImpl<TResult>(wrapped.aggregate(pipeline, clazz));
}
 
@Override
public AggregatePublisher<Document> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline) {
    return aggregate(clientSession, pipeline, Document.class);
}
 
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline,
                                                       final Class<TResult> clazz) {
    return new AggregatePublisherImpl<TResult>(wrapped.aggregate(clientSession.getWrapped(), pipeline, clazz));
}
 
@Override
public AggregatePublisher<Document> aggregate(final List<? extends Bson> pipeline) {
    return aggregate(pipeline, Document.class);
}
 
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
    return new AggregatePublisherImpl<TResult>(wrapped.aggregate(pipeline, resultClass));
}
 
@Override
public AggregatePublisher<Document> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline) {
    return aggregate(clientSession, pipeline, Document.class);
}
 
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline,
                                                       final Class<TResult> resultClass) {
    notNull("clientSession", clientSession);
    return new AggregatePublisherImpl<TResult>(wrapped.aggregate(clientSession.getWrapped(), pipeline, resultClass));
}
 
 类所在包
 类方法
 同包方法