下面列出了怎么用com.mongodb.client.model.Accumulators的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 根据统计字段计算统计结果(gte最小值)并排序
*
* @param collectionName 集合名
* @param match match条件
* @param field 统计字段
* @param minCount 最小值
* @return
*/
public LinkedHashMap<String, Integer> sortMap(String collectionName, Document match, String field, int minCount) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group("$" + field, Accumulators.sum("_count", 1))
, match(new Document("_count", new Document("$gte", minCount)))
, sort(new Document("_count", -1))
)
);
LinkedHashMap<String, Integer> map = new LinkedHashMap<String, Integer>();
MongoCursor<Document> iterator = aggregate.iterator();
while (iterator.hasNext()) {
Document next = iterator.next();
map.put(next.getString("_id"), next.getInteger("_count"));
}
return map;
}
private Single<Map<Object, Object>> usersStatusRepartition(AnalyticsQuery query) {
return Observable.fromPublisher(usersCollection.aggregate(
Arrays.asList(
Aggregates.match(and(eq(FIELD_REFERENCE_TYPE, DOMAIN.name()), eq(FIELD_REFERENCE_ID, query.getDomain()))),
Aggregates.group(
new BasicDBObject("_id", query.getField()),
Accumulators.sum("total", 1),
Accumulators.sum("disabled", new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$eq", Arrays.asList("$enabled", false)), 1, 0))),
Accumulators.sum("locked", new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$and", Arrays.asList(new BasicDBObject("$eq", Arrays.asList("$accountNonLocked", false)), new BasicDBObject("$gte", Arrays.asList("$accountLockedUntil", new Date())))), 1, 0))),
Accumulators.sum("inactive", new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$lte", Arrays.asList("$loggedAt", new Date(Instant.now().minus(90, ChronoUnit.DAYS).toEpochMilli()))), 1, 0)))
)
)))
.map(doc -> {
Long nonActiveUsers = ((Number) doc.get("disabled")).longValue() + ((Number) doc.get("locked")).longValue() + ((Number) doc.get("inactive")).longValue();
Long activeUsers = ((Number) doc.get("total")).longValue() - nonActiveUsers;
Map<Object, Object> users = new HashMap<>();
users.put("active", activeUsers);
users.putAll(doc.entrySet()
.stream()
.filter(e -> !"_id".equals(e.getKey()) && !"total".equals(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
return users;
})
.first(Collections.emptyMap());
}
private Single<Map<Object, Object>> registrationsStatusRepartition(AnalyticsQuery query) {
return Observable.fromPublisher(usersCollection.aggregate(
Arrays.asList(
Aggregates.match(and(eq(FIELD_REFERENCE_TYPE, DOMAIN.name()), eq(FIELD_REFERENCE_ID, query.getDomain()), eq(FIELD_PRE_REGISTRATION, true))),
Aggregates.group(new BasicDBObject("_id", query.getField()),
Accumulators.sum("total", 1),
Accumulators.sum("completed", new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$eq", Arrays.asList("$registrationCompleted", true)), 1, 0))))
)))
.map(doc -> {
Map<Object, Object> registrations = new HashMap<>();
registrations.putAll(doc.entrySet()
.stream()
.filter(e -> !"_id".equals(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
return registrations;
})
.first(Collections.emptyMap());
}
private BsonField accumulator(String field, Expression expression) {
Preconditions.checkArgument(expression instanceof Call, "not a call %s", expression);
final Call call = (Call) expression;
final Operator op = call.operator();
Preconditions.checkArgument(AggregationOperators.isAggregation(op), "not an aggregation operator: %s", op);
final String name = "$" + naming.get(extractPath(expression));
if (op == AggregationOperators.AVG) {
return Accumulators.avg(field, name);
} else if (op == AggregationOperators.COUNT) {
return Accumulators.sum(field, 1);
} else if (op == AggregationOperators.MAX) {
return Accumulators.max(field, name);
} else if (op == AggregationOperators.MIN) {
return Accumulators.min(field, name);
} else if (op == AggregationOperators.SUM) {
return Accumulators.sum(field, name);
} else {
throw new IllegalArgumentException(String.format("Unknown aggregation operator %s from %s", op, expression));
}
}
/**
* For $group stage of an aggregation pipeline over a snapshot collection: take the newest values of fields
* of serialized snapshots. Always include the first snapshot lifecycle.
*
* @param snapshotFields fields of a serialized snapshot to project.
* @return list of group stage field accumulators.
*/
private List<BsonField> asFirstSnapshotBsonFields(final String... snapshotFields) {
return Stream.concat(Stream.of(LIFECYCLE), Arrays.stream(snapshotFields))
.map(fieldName -> {
final String serializedFieldName = String.format("$%s.%s", SERIALIZED_SNAPSHOT, fieldName);
return Accumulators.first(fieldName, serializedFieldName);
})
.collect(Collectors.toList());
}
/**
* 最大统计
*
* @param collectionName
* @param match
* @param maxField
* @return
*/
public Object max(String collectionName, Document match, String maxField) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group(null, Accumulators.max("_max", "$" + maxField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.get("_max");
}
return null;
}
/**
* 最小统计
*
* @param collectionName
* @param match
* @param minField
* @return
*/
public Object min(String collectionName, Document match, String minField) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group(null, Accumulators.min("_min", "$" + minField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.get("_min");
}
return null;
}
/**
* 合统计
*
* @param collectionName
* @param match
* @param sumField
* @return
*/
public Double sum(String collectionName, Document match, String sumField) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group(null, Accumulators.sum("_sum", "$" + sumField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.getDouble("_sum");
}
return null;
}
/**
* 平均统计
*
* @param collectionName
* @param match
* @param avgField
* @return
*/
public Double avg(String collectionName, Document match, String avgField) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group(null, Accumulators.avg("_avg", "$" + avgField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.getDouble("_avg");
}
return null;
}
/**
* 最近统计
*
* @param collectionName
* @param match
* @param lastField
* @param sort
* @return
*/
public Object last(String collectionName, Document match, String lastField, Document sort) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, sort(sort)
, group(null, Accumulators.last("_last", "$" + lastField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.get("_last");
}
return null;
}
/**
* 标准差统计
*
* @param collectionName
* @param match
* @param stdDevField
* @return
*/
public Double stdDevPop(String collectionName, Document match, String stdDevField) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, group(null, Accumulators.stdDevPop("_stdDev", "$" + stdDevField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.getDouble("_stdDev");
}
return null;
}
/**
* 采样标准差统计
*
* @param collectionName
* @param match
* @param stdDevField
* @param sampleSize
* @return
*/
public Double stdDevSamp(String collectionName, Document match, String stdDevField, int sampleSize) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match)
, sample(sampleSize)
, group(null, Accumulators.stdDevSamp("_stdDev", "$" + stdDevField))
)
);
Document first = aggregate.first();
if (first != null) {
return first.getDouble("_stdDev");
}
return null;
}
/**
* 统计值 是否在统计结果(gte最小值)中
*
* @param collectionName 集合名
* @param match match条件
* @param field 统计字段
* @param value 统计值
* @param minCount 最小值
* @return
*/
public boolean inSortMap(String collectionName, Document match, String field, Object value, int minCount) {
AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate(
Arrays.asList(
match(match.append(field, value))
, group("$" + field, Accumulators.sum("_count", 1))
, match(new Document("_count", new Document("$gte", minCount)))
)
);
Document first = aggregate.first();
return first == null ? false : true;
}
private Single<Map<Object, Object>> executeHistogram(AuditReportableCriteria criteria, Bson query) {
// NOTE : MongoDB does not return count : 0 if there is no matching document in the given time range, we need to add it by hand
Map<Long, Long> intervals = intervals(criteria);
String fieldSuccess = (criteria.types().get(0) + "_" + Status.SUCCESS).toLowerCase();
String fieldFailure = (criteria.types().get(0) + "_" + Status.FAILURE).toLowerCase();
return Observable.fromPublisher(reportableCollection.aggregate(Arrays.asList(
Aggregates.match(query),
Aggregates.group(
new BasicDBObject("_id",
new BasicDBObject("$subtract",
Arrays.asList(
new BasicDBObject("$subtract", Arrays.asList("$timestamp", new Date(0))),
new BasicDBObject("$mod", Arrays.asList(new BasicDBObject("$subtract", Arrays.asList("$timestamp", new Date(0))), criteria.interval()))
))),
Accumulators.sum(fieldSuccess, new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$eq", Arrays.asList("$outcome.status", Status.SUCCESS)), 1, 0))),
Accumulators.sum(fieldFailure, new BasicDBObject("$cond", Arrays.asList(new BasicDBObject("$eq", Arrays.asList("$outcome.status", Status.FAILURE)), 1, 0)))))))
.toList()
.map(docs -> {
Map<Long, Long> successResult = new HashMap<>();
Map<Long, Long> failureResult = new HashMap<>();
docs.forEach(document -> {
Long timestamp = ((Number) ((Document) document.get("_id")).get("_id")).longValue();
Long successAttempts = ((Number) document.get(fieldSuccess)).longValue();
Long failureAttempts = ((Number) document.get(fieldFailure)).longValue();
successResult.put(timestamp, successAttempts);
failureResult.put(timestamp, failureAttempts);
});
// complete result with remaining intervals
intervals.forEach((k, v) -> {
successResult.putIfAbsent(k, v);
failureResult.putIfAbsent(k, v);
});
List<Long> successData = successResult.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> e.getValue()).collect(Collectors.toList());
List<Long> failureData = failureResult.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e -> e.getValue()).collect(Collectors.toList());
Map<Object, Object> result = new HashMap<>();
result.put(fieldSuccess, successData);
result.put(fieldFailure, failureData);
return result;
});
}
private Single<Map<Object, Object>> executeGroupBy(AuditReportableCriteria criteria, Bson query) {
return Observable.fromPublisher(reportableCollection.aggregate(
Arrays.asList(
Aggregates.match(query),
Aggregates.group(new BasicDBObject("_id", "$" + criteria.field()), Accumulators.sum("count", 1)),
Aggregates.limit(criteria.size() != null ? criteria.size() : 50))
))
.toList()
.map(docs -> docs.stream().collect(Collectors.toMap(d -> ((Document) d.get("_id")).get("_id"), d -> d.get("count"))));
}
@Override
public Map<String, Integer> getCardsCountGlobal(MagicCollection c) throws SQLException {
Map<String, Integer> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
List<Bson> aggr = Arrays.asList(Aggregates.match(Filters.eq(dbColIDField, c.getName())),
Aggregates.group("$edition", Accumulators.sum("count", 1)));
logger.trace(aggr.toString());
db.getCollection(colCards, BasicDBObject.class).aggregate(aggr).forEach(
(Consumer<BasicDBObject>) document -> map.put(document.getString("_id"), document.getInt("count")));
return map;
}
@Test
public void givenCountryCollection_whenNeighborsCalculated_thenMaxIsFifteenInChina() {
Bson borderingCountriesCollection = project(Projections.fields(Projections.excludeId(), Projections.include("name"), Projections.computed("borderingCountries", Projections.computed("$size", "$borders"))));
int maxValue = collection.aggregate(Arrays.asList(borderingCountriesCollection, group(null, Accumulators.max("max", "$borderingCountries"))))
.first()
.getInteger("max");
assertEquals(15, maxValue);
Document maxNeighboredCountry = collection.aggregate(Arrays.asList(borderingCountriesCollection, match(Filters.eq("borderingCountries", maxValue))))
.first();
assertTrue(maxNeighboredCountry.containsValue("China"));
}
private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
final MongoCollection<Document> snapshotStore,
final String start,
final int batchSize,
final String... snapshotFields) {
final List<Bson> pipeline = new ArrayList<>(5);
// optional match stage
if (!start.isEmpty()) {
pipeline.add(Aggregates.match(Filters.gt(PROCESSOR_ID, start)));
}
// sort stage
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(PROCESSOR_ID), Sorts.descending(SN))));
// limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
pipeline.add(Aggregates.limit(batchSize));
// group stage 1: by PID
pipeline.add(Aggregates.group("$" + PROCESSOR_ID, asFirstSnapshotBsonFields(snapshotFields)));
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(ID)));
// group stage 2: filter out pids whose latest snapshot is a deleted snapshot, but retain max encountered pid
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + ID),
Accumulators.push(
items,
new Document().append("$cond", new Document()
.append("if", new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
.append("then", "$$CURRENT")
.append("else", null)
))
));
// remove null entries by projection
pipeline.add(Aggregates.project(new Document()
.append(maxPid, 1)
.append(items, new Document()
.append("$setDifference", Arrays.asList("$" + items, Collections.singletonList(null)))
)
));
return Source.fromPublisher(snapshotStore.aggregate(pipeline))
.flatMapConcat(document -> {
final String theMaxPid = document.getString(maxPid);
if (theMaxPid == null) {
return Source.empty();
} else {
return Source.single(new SnapshotBatch(theMaxPid, document.getList(items, Document.class)));
}
});
}
public MongoAccumulator sum(String fieldName, String expression) {
accumulators.add(Accumulators.sum(fieldName, expression));
return this;
}
public MongoAccumulator avg(String fieldName, String expression) {
accumulators.add(Accumulators.avg(fieldName, expression));
return this;
}
public MongoAccumulator first(String fieldName, String expression) {
accumulators.add(Accumulators.first(fieldName, expression));
return this;
}
public MongoAccumulator last(String fieldName, String expression) {
accumulators.add(Accumulators.last(fieldName, expression));
return this;
}
public MongoAccumulator max(String fieldName, String expression) {
accumulators.add(Accumulators.max(fieldName, expression));
return this;
}
public MongoAccumulator min(String fieldName, String expression) {
accumulators.add(Accumulators.min(fieldName, expression));
return this;
}
public MongoAccumulator push(String fieldName, String expression) {
accumulators.add(Accumulators.push(fieldName, expression));
return this;
}
public MongoAccumulator addToSet(String fieldName, String expression) {
accumulators.add(Accumulators.addToSet(fieldName, expression));
return this;
}
public MongoAccumulator stdDevPop(String fieldName, String expression) {
accumulators.add(Accumulators.stdDevPop(fieldName, expression));
return this;
}
public MongoAccumulator stdDevSamp(String fieldName, String expression) {
accumulators.add(Accumulators.stdDevSamp(fieldName, expression));
return this;
}
@Test
public void givenCountryCollection_whenCountedRegionWise_thenMaxInAfrica() {
Document maxCountriedRegion = collection.aggregate(Arrays.asList(group("$region", Accumulators.sum("tally", 1)), sort(Sorts.descending("tally"))))
.first();
assertTrue(maxCountriedRegion.containsValue("Africa"));
}