下面列出了怎么用com.mongodb.reactivestreams.client.AggregatePublisher的API类实例代码及写法,或者点击链接到github查看源代码。
public <T> AggregatePublisher<T> apply(AggregatePublisher<T> stream) {
AggregatePublisher<T> publisher = stream;
if (hint != null) {
publisher = publisher.hint(hint);
}
if (comment != null) {
publisher = publisher.comment(comment);
}
if (collation != null) {
publisher = publisher.collation(collation);
}
publisher.bypassDocumentValidation(bypassDocumentValidation);
publisher.allowDiskUse(allowDiskUse);
if (maxAwaitTime > 0) {
publisher.maxAwaitTime(maxAwaitTime, maxAwaitTimeUnit);
}
if (maxTime > 0) {
publisher.maxAwaitTime(maxTime, maxTimeUnit);
}
return publisher;
}
@Override
public Source<SearchNamespaceReportResult, NotUsed> generateNamespaceCountReport() {
final AggregatePublisher<Document> aggregatePublisher = collection.aggregate(
Collections.singletonList(
new Document("$group",
new Document(FIELD_ID, "$_namespace")
.append(PersistenceConstants.FIELD_COUNT, new Document("$sum", 1))
)
)
);
return Source.fromPublisher(aggregatePublisher)
.map(document -> {
final String namespace = document.get(FIELD_ID) != null
? document.get(FIELD_ID).toString()
: "NOT_MIGRATED";
final long count = Long.parseLong(document.get(PersistenceConstants.FIELD_COUNT).toString());
return new SearchNamespaceResultEntry(namespace, count);
})
.fold(new ArrayList<SearchNamespaceResultEntry>(), (list, entry) -> {
list.add(entry);
return list;
})
.map(SearchNamespaceReportResult::new);
}
private <T> AggregatePublisher<T> apply(AggregateOptions options, AggregatePublisher<T> aggregate) {
if (options == null) {
return aggregate;
}
return options.apply(aggregate);
}
private <D> AggregatePublisher<D> apply(AggregateOptions options, AggregatePublisher<D> publisher) {
if (options == null) {
return publisher;
}
return options.apply(publisher);
}
@Override
public Object executeQuery(QueryProvider queryProvider) {
// convert the pipelines by parsing the JSON strings
Iterator iterator = queryProvider.getPipelines().iterator();
int i = 0;
String collectionName = queryProvider.getCollectionName();
List<Bson> pipelineStages = new ArrayList<>();
while (iterator.hasNext()) {
String query = (String) iterator.next();
LOGGER.trace("Processing query string {} for pipeline stage {}", query, i++);
Bson document = BsonDocument.parse(query);
pipelineStages.add(document);
}
// run the pipeline and return a flux.
MongoCollection<Document> collection = mongoOperations.getCollection(collectionName);
AggregatePublisher<Document> aggregatePublisher = collection.aggregate(pipelineStages)
.allowDiskUse(queryProvider.isAllowDiskUse())
.maxTime(queryProvider.getMaxTimeMS(), MILLISECONDS);
Class methodReturnType = queryProvider.getMethodReturnType();
boolean isFlux = Flux.class.isAssignableFrom(methodReturnType);
boolean isMono = Mono.class.isAssignableFrom(methodReturnType);
boolean isFluxOrMono = isFlux || isMono;
if (!isFluxOrMono) {
throw new IllegalArgumentException("Method return type must be of Flux or Mono type");
}
Class<?> outputClass = queryProvider.getOutputClass();
if (isFlux) {
LOGGER.trace("Return type is Flux<{}>", outputClass);
Flux<Document> retval = Flux.from(aggregatePublisher);
if (outputClass != null) {
return adaptPipeline(queryProvider, outputClass, retval);
}
return retval;
}
else {
Mono<Document> mono = Mono.from(aggregatePublisher);
if (outputClass != null) {
LOGGER.trace("Return type is Mono<{}>", outputClass);
return adaptPipeline(queryProvider, outputClass, mono);
}
return mono;
}
}
@Override
public AggregatePublisher<Document> aggregate(final List<? extends Bson> pipeline) {
return aggregate(pipeline, Document.class);
}
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final List<? extends Bson> pipeline, final Class<TResult> clazz) {
return new AggregatePublisherImpl<TResult>(wrapped.aggregate(pipeline, clazz));
}
@Override
public AggregatePublisher<Document> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline) {
return aggregate(clientSession, pipeline, Document.class);
}
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline,
final Class<TResult> clazz) {
return new AggregatePublisherImpl<TResult>(wrapped.aggregate(clientSession.getWrapped(), pipeline, clazz));
}
@Override
public AggregatePublisher<Document> aggregate(final List<? extends Bson> pipeline) {
return aggregate(pipeline, Document.class);
}
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
return new AggregatePublisherImpl<TResult>(wrapped.aggregate(pipeline, resultClass));
}
@Override
public AggregatePublisher<Document> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline) {
return aggregate(clientSession, pipeline, Document.class);
}
@Override
public <TResult> AggregatePublisher<TResult> aggregate(final ClientSession clientSession, final List<? extends Bson> pipeline,
final Class<TResult> resultClass) {
notNull("clientSession", clientSession);
return new AggregatePublisherImpl<TResult>(wrapped.aggregate(clientSession.getWrapped(), pipeline, resultClass));
}