下面列出了怎么用com.mongodb.client.model.Sorts的API类实例代码及写法,或者点击链接到github查看源代码。
private void assertCollectionOrder(final String collectionName, final boolean exactOrdering) {
List<Long> expectedIdOrder = LongStream.range(0, 100).boxed().collect(Collectors.toList());
List<Long> idOrder =
getCollection(collectionName).find().sort(Sorts.ascending("_id")).into(new ArrayList<>())
.stream()
.map(d -> d.getLong("id"))
.collect(Collectors.toList());
assertEquals(
new HashSet<>(expectedIdOrder),
new HashSet<>(idOrder),
format("%s missing expected values.", collectionName));
if (exactOrdering) {
assertEquals(expectedIdOrder, idOrder, format("%s is out of order.", collectionName));
} else {
assertNotEquals(
expectedIdOrder, idOrder, format("%s unexpectedly in order.", collectionName));
}
}
public PageInfo<MiniTransactionInfo> getBlockTxList(int chainId, int pageIndex, int pageSize, long blockHeight, int type) {
Bson filter = null;
if (type == 0) {
filter = eq("height", blockHeight);
} else {
filter = and(eq("type", type), eq("height", blockHeight));
}
BlockHeaderInfo blockInfo = mongoBlockServiceImpl.getBlockHeader(chainId, blockHeight);
if (blockInfo == null) {
return null;
}
long count = mongoDBService.getCount(TX_TABLE + chainId, filter);
List<MiniTransactionInfo> txList = new ArrayList<>();
List<Document> docList = this.mongoDBService.pageQuery(TX_TABLE + chainId, filter, Sorts.descending("height"), pageIndex, pageSize);
for (Document document : docList) {
txList.add(MiniTransactionInfo.toInfo(document));
}
PageInfo<MiniTransactionInfo> pageInfo = new PageInfo<>(pageIndex, pageSize, count, txList);
return pageInfo;
}
public PageInfo<PunishLogInfo> getPunishLogList(int chainId, int type, String address, int pageIndex, int pageSize) {
Bson filter = null;
if (type == 0 && !StringUtils.isBlank(address)) {
filter = Filters.eq("address", address);
} else if (type > 0 && StringUtils.isBlank(address)) {
filter = Filters.eq("type", type);
} else if (type > 0 && !StringUtils.isBlank(address)) {
filter = Filters.and(eq("type", type), eq("address", address));
}
long totalCount = mongoDBService.getCount(PUNISH_TABLE + chainId, filter);
List<Document> documentList = mongoDBService.pageQuery(PUNISH_TABLE + chainId, filter, Sorts.descending("time"), pageIndex, pageSize);
List<PunishLogInfo> punishLogList = new ArrayList<>();
for (Document document : documentList) {
punishLogList.add(DocumentTransferTool.toInfo(document, PunishLogInfo.class));
}
PageInfo<PunishLogInfo> pageInfo = new PageInfo<>(pageIndex, pageSize, totalCount, punishLogList);
return pageInfo;
}
@Override
public List<MiniBlockHeaderInfo> getBlockList(int chainId, long startTime, long endTime) {
if (!CacheManager.isChainExist(chainId)) {
return new ArrayList<>();
}
BasicDBObject fields = new BasicDBObject();
fields.append("_id", 1).append("createTime", 1).append("txCount", 1).append("agentHash", 1).
append("agentId", 1).append("agentAlias", 1).append("size", 1).append("reward", 1);
Bson filter = Filters.and(Filters.gt("createTime", startTime), Filters.lte("createTime", endTime));
List<Document> docsList = this.mongoDBService.query(BLOCK_HEADER_TABLE + chainId, filter, fields, Sorts.descending("_id"));
List<MiniBlockHeaderInfo> list = new ArrayList<>();
for (Document document : docsList) {
list.add(DocumentTransferTool.toInfo(document, "height", MiniBlockHeaderInfo.class));
}
return list;
}
/**
* Builds singleton List (with always 1 entry) of a Bson document used for sorting based on the passed
* {@code sortDirection} and {@code fieldName}.
*
* @param sortDirection the {@link SortDirection} to apply.
* @param sortKey the sorting key.
* @return the singleton List of a Bson sort document.
*/
private static List<Bson> getSortBson(final SortDirection sortDirection, final String sortKey) {
requireNonNull(sortDirection);
final Bson sort;
switch (sortDirection) {
case ASC:
sort = Sorts.ascending(sortKey);
break;
case DESC:
sort = Sorts.descending(sortKey);
break;
default:
throw new IllegalStateException("Unknown SortDirection=" + sortDirection);
}
return Collections.singletonList(sort);
}
@Override
public void process(Consumer<Bson> consumer) {
final Function<Collation, Bson> toSortFn = col -> {
final String name = naming.get(col.path());
return col.direction().isAscending() ? Sorts.ascending(name) : Sorts.descending(name);
};
BsonDocument sort = new BsonDocument();
for (Collation collation: query.collations()) {
sort.putAll(toSortFn.apply(collation).toBsonDocument(BsonDocument.class, codecRegistry));
}
if (!sort.isEmpty()) {
consumer.accept(Aggregates.sort(sort));
}
}
@Bean
DatabaseNewsService databaseNews() {
return () -> client.getDatabase("news")
.getCollection("news")
.find(News.class)
.sort(Sorts.descending("publishedOn"))
.filter(Filters.eq("category", "tech"));
}
@Override
public void subscribe(Subscriber<? super News> s) {
FindPublisher<News> findPublisher = collection.find(News.class);
findPublisher.sort(Sorts.descending("publishedOn"))
.filter(Filters.and(
Filters.eq("category", category),
Filters.gt("publishedOn", today())
))
.subscribe(s);
}
public List<PocRoundItem> getRoundItemList(int chainId, long roundIndex) {
List<Document> list = this.mongoDBService.query(ROUND_ITEM_TABLE + chainId, eq("roundIndex", roundIndex), Sorts.ascending("order"));
List<PocRoundItem> itemList = new ArrayList<>();
for (Document document : list) {
itemList.add(DocumentTransferTool.toInfo(document, "id", PocRoundItem.class));
}
return itemList;
}
public List<PocRound> getRoundList(int chainId, int pageIndex, int pageSize) {
List<Document> list = this.mongoDBService.pageQuery(ROUND_TABLE + chainId, Sorts.descending("_id"), pageIndex, pageSize);
List<PocRound> roundList = new ArrayList<>();
for (Document document : list) {
roundList.add(DocumentTransferTool.toInfo(document, "index", PocRound.class));
}
return roundList;
}
/**
* @param type 0:14天,1:周,2:月,3:年,4:全部
* @return
*/
public List getStatisticalList(int chainId, int type, String field) {
List<KeyValue> list = new ArrayList<>();
long startTime = getStartTime(type);
List<Document> documentList = mongoDBService.query(STATISTICAL_TABLE + chainId, gte("_id", startTime), Sorts.ascending("_id"));
if (documentList.size() < 32) {
for (Document document : documentList) {
KeyValue keyValue = new KeyValue();
keyValue.setKey(document.get("month") + "/" + document.get("date"));
if (ANNUALIZE_REWARD.equals(field)) {
keyValue.setValue(document.getDouble(field));
} else if (CONSENSUS_LOCKED.equals(field)) {
keyValue.setValue(new BigInteger(document.getString(field)));
} else {
keyValue.setValue(document.getLong(field));
}
list.add(keyValue);
}
} else {
if (TX_COUNT.equals(field)) {
summaryLong(list, documentList, field);
} else if (ANNUALIZE_REWARD.equals(field)) {
avgDouble(list, documentList, field);
} else if (CONSENSUS_LOCKED.equals(field)) {
avgBigInteger(list, documentList, field);
} else {
avgLong(list, documentList, field);
}
}
return list;
}
public void deleteTxs(int chainId) {
long totalCount = mongoDBService.getCount(TX_TABLE + chainId);
if (totalCount > 1000000) {
int deleteCount = (int) (totalCount - 1000000);
BasicDBObject fields = new BasicDBObject();
fields.append("_id", 1);
List<Document> docList = this.mongoDBService.pageQuery(TX_TABLE + chainId, null, fields, Sorts.ascending("createTime"), 1, deleteCount);
List<String> hashList = new ArrayList<>();
for (Document document : docList) {
hashList.add(document.getString("_id"));
}
mongoDBService.delete(TX_TABLE + chainId, Filters.in("_id", hashList));
}
}
public PageInfo<MiniTransactionInfo> getTxList(int chainId, int pageIndex, int pageSize, int type,
boolean isHidden, long startTime, long endTime) {
Bson filter = null;
if (type > 0 && startTime > 0 && endTime > 0) {
filter = Filters.and(Filters.eq("type", type), Filters.gte("createTime", startTime), Filters.lte("createTime", endTime));
} else if (type > 0 && startTime > 0) {
filter = Filters.and(Filters.eq("type", type), Filters.gte("createTime", startTime));
} else if (type > 0 && endTime > 0) {
filter = Filters.and(Filters.eq("type", type), Filters.lte("createTime", endTime));
} else if (type > 0) {
filter = Filters.eq("type", type);
} else if (isHidden && startTime > 0 && endTime > 0) {
filter = Filters.and(ne("type", 1), Filters.gte("createTime", startTime), Filters.lte("createTime", endTime));
} else if (isHidden && startTime > 0) {
filter = Filters.and(ne("type", 1), Filters.gte("createTime", startTime));
} else if (isHidden && endTime > 0) {
filter = Filters.and(ne("type", 1), Filters.lte("createTime", endTime));
} else if (isHidden) {
filter = ne("type", 1);
} else if (startTime > 0 && endTime > 0) {
filter = Filters.and(Filters.gte("createTime", startTime), Filters.lte("createTime", endTime));
} else if (startTime > 0) {
filter = Filters.gte("createTime", startTime);
} else if (endTime > 0) {
filter = Filters.lte("createTime", endTime);
}
long totalCount = mongoDBService.getCount(TX_TABLE + chainId, filter);
List<Document> docList = this.mongoDBService.pageQuery(TX_TABLE + chainId, filter, Sorts.descending("createTime"), pageIndex, pageSize);
List<MiniTransactionInfo> txList = new ArrayList<>();
for (Document document : docList) {
txList.add(MiniTransactionInfo.toInfo(document));
}
PageInfo<MiniTransactionInfo> pageInfo = new PageInfo<>(pageIndex, pageSize, totalCount, txList);
return pageInfo;
}
public PageInfo<MiniBlockHeaderInfo> pageQuery(int chainId, int pageIndex, int pageSize, String packingAddress, boolean filterEmptyBlocks) {
if (!CacheManager.isChainExist(chainId)) {
return new PageInfo<>(pageIndex, pageSize);
}
Bson filter = null;
if (StringUtils.isNotBlank(packingAddress)) {
filter = Filters.eq("packingAddress", packingAddress);
}
if (filterEmptyBlocks) {
if (filter == null) {
filter = Filters.gt("txCount", 1);
} else {
filter = Filters.and(filter, Filters.gt("txCount", 1));
}
}
long totalCount = mongoDBService.getCount(BLOCK_HEADER_TABLE + chainId, filter);
BasicDBObject fields = new BasicDBObject();
fields.append("_id", 1).append("createTime", 1).append("txCount", 1).append("agentHash", 1).
append("agentId", 1).append("agentAlias", 1).append("size", 1).append("reward", 1);
List<Document> docsList = this.mongoDBService.pageQuery(BLOCK_HEADER_TABLE + chainId, filter, fields, Sorts.descending("_id"), pageIndex, pageSize);
List<MiniBlockHeaderInfo> list = new ArrayList<>();
for (Document document : docsList) {
list.add(DocumentTransferTool.toInfo(document, "height", MiniBlockHeaderInfo.class));
}
PageInfo<MiniBlockHeaderInfo> pageInfo = new PageInfo<>(pageIndex, pageSize, totalCount, list);
return pageInfo;
}
@Override
public int getBlockPackageTxCount(int chainId, long startHeight, long endHeight) {
if (!CacheManager.isChainExist(chainId)) {
return 0;
}
BasicDBObject fields = new BasicDBObject();
fields.append("txCount", 1);
Bson filter = Filters.and(Filters.gt("_id", startHeight), Filters.lte("_id", endHeight));
List<Document> docsList = this.mongoDBService.query(BLOCK_HEADER_TABLE + chainId, filter, fields, Sorts.descending("_id"));
int count = 0;
for (Document document : docsList) {
count += document.getInteger("txCount");
}
return count;
}
@Override
public List<T> find(Query query) {
query.purge();
final List<Bson> sortSpec = query.getSortingSpecification().getMap().entrySet().stream().map(entry -> {
if (entry.getValue().equals(SortOption.ASCENDING)) {
return Sorts.ascending(entry.getKey());
} else {
return Sorts.descending(entry.getKey());
}
}).collect(Collectors.toList());
final var expression = new QueryBuilder<>(schema).expression(query.getExpression());
return mongoCollection.find(expression).limit(query.getLimit()).skip(query.getSkip()).sort(Sorts.orderBy(sortSpec)).into(new ArrayList<>());
}
public static void printCollection(MongoCollection collection) {
Bson bson = Sorts.ascending("fname");
FindIterable<Document> docs = collection.find().sort(bson);
int num = 0;
for (Document doc : docs) {
String name = doc.getString("fname");
String relation = doc.getString("relation");
System.out.printf("%4d. %s, %s%n", ++num, name, relation);
}
}
public static void main(String[] args) {
MongoClient client = new MongoClient("localhost", 27017);
MongoDatabase friends = client.getDatabase("friends");
MongoCollection relatives = friends.getCollection("relatives");
Bson bson = Sorts.ascending("fname");
FindIterable<Document> docs = relatives.find().sort(bson);
int num = 0;
for (Document doc : docs) {
String name = doc.getString("fname");
String relation = doc.getString("relation");
System.out.printf("%4d. %s, %s%n", ++num, name, relation);
}
}
public static void main(String[] args) {
MongoClient client = new MongoClient("localhost", 27017);
MongoDatabase friends = client.getDatabase("friends");
MongoCollection relatives = friends.getCollection("relatives");
Bson bson = Sorts.ascending("fname");
FindIterable<Document> docs = relatives.find().sort(bson);
int num = 0;
for (Document doc : docs) {
String name = doc.getString("fname");
String relation = doc.getString("relation");
System.out.printf("%4d. %s, %s%n", ++num, name, relation);
}
}
private Source<String, NotUsed> listJournalPidsAbove(final MongoCollection<Document> journal, final String start,
final int batchSize, final Duration maxBackOff, final int maxRestarts) {
final List<Bson> pipeline = new ArrayList<>(5);
// optional match stage
if (!start.isEmpty()) {
pipeline.add(Aggregates.match(Filters.gt(PROCESSOR_ID, start)));
}
// sort stage
pipeline.add(Aggregates.sort(Sorts.ascending(PROCESSOR_ID)));
// limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
pipeline.add(Aggregates.limit(batchSize));
// group stage
pipeline.add(Aggregates.group("$" + PROCESSOR_ID));
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(ID)));
final Duration minBackOff = Duration.ofSeconds(1L);
final double randomFactor = 0.1;
return RestartSource.onFailuresWithBackoff(minBackOff, maxBackOff, randomFactor, maxRestarts,
() -> Source.fromPublisher(journal.aggregate(pipeline))
.filter(document -> document.containsKey(ID))
.map(document -> document.getString(ID)));
}
/**
* Gets the SortOptions as {@link Bson}.
*
* @return the Bson
*/
public Bson getSortOptionsAsBson() {
final List<Bson> sorts = new ArrayList<>();
for (final SortOption sortOption : sortOptions) {
final SortDirection sortDirection = sortOption.getSortDirection();
final SortFieldExpression sortExpression = sortOption.getSortExpression();
final List<Bson> currentSorts = GetSortBsonVisitor.apply(sortExpression, sortDirection);
sorts.addAll(currentSorts);
}
return Sorts.orderBy(sorts);
}
@Override
public Source<Metadata, NotUsed> sudoStreamMetadata(final EntityId lowerBound) {
final Bson notDeletedFilter = Filters.exists(FIELD_DELETE_AT, false);
final Bson filter = lowerBound.isDummy()
? notDeletedFilter
: Filters.and(notDeletedFilter, Filters.gt(FIELD_ID, lowerBound.toString()));
final Bson relevantFieldsProjection =
Projections.include(FIELD_ID, FIELD_REVISION, FIELD_POLICY_ID, FIELD_POLICY_REVISION,
FIELD_PATH_MODIFIED);
final Bson sortById = Sorts.ascending(FIELD_ID);
final Publisher<Document> publisher = collection.find(filter)
.projection(relevantFieldsProjection)
.sort(sortById);
return Source.fromPublisher(publisher).map(MongoThingsSearchPersistence::readAsMetadata);
}
@Before
public void before() {
final SortFieldExpression sortExp1 = EFT.sortByThingId();
final SortFieldExpression sortExp2 = EFT.sortByAttribute("test");
knownSortOptions =
Arrays.asList(new SortOption(sortExp1, SortDirection.ASC),
new SortOption(sortExp2, SortDirection.DESC));
final String thingIdFieldName = ((SimpleFieldExpressionImpl) EFT.sortByThingId()).getFieldName();
final String attributeFieldName = "attributes.test";
knownSortOptionsExpectedBson = Sorts.orderBy(Arrays.asList(
Sorts.ascending(thingIdFieldName),
Sorts.descending(FIELD_SORTING + DOT + attributeFieldName)));
}
@Override
public CompletableFuture<List<DomainEventStream>> queryAggregateEventsAsync(String aggregateRootId, String aggregateRootTypeName, int minVersion, int maxVersion) {
return IOHelper.tryIOFuncAsync(() -> {
CompletableFuture<List<DomainEventStream>> future = new CompletableFuture<>();
Bson filter = Filters.and(Filters.eq("aggregateRootId", aggregateRootId),
Filters.gte("version", minVersion),
Filters.lte("version", maxVersion));
Bson sort = Sorts.ascending("version");
mongoClient.getDatabase(mongoConfiguration.getDatabaseName()).getCollection(mongoConfiguration.getEventCollectionName())
.find(filter).sort(sort).subscribe(new Subscriber<Document>() {
final List<DomainEventStream> streams = Lists.newArrayList();
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Document document) {
DomainEventStream eventStream = new DomainEventStream(
document.getString("commandId"),
document.getString("aggregateRootId"),
document.getString("aggregateRootTypeName"),
document.get("gmtCreate", Date.class),
eventSerializer.deserialize(JsonTool.deserialize(document.getString("events"), Map.class), IDomainEvent.class),
Maps.newHashMap());
streams.add(eventStream);
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onComplete() {
streams.sort(Comparator.comparingInt(DomainEventStream::getVersion));
future.complete(streams);
}
});
return future.exceptionally(throwable -> {
if (throwable instanceof MongoWriteException) {
MongoWriteException ex = (MongoWriteException) throwable;
String errorMessage = String.format("Failed to query aggregate events async, aggregateRootId: %s, aggregateRootType: %s", aggregateRootId, aggregateRootTypeName);
logger.error(errorMessage, ex);
throw new IORuntimeException(throwable);
}
logger.error("Failed to query aggregate events async, aggregateRootId: {}, aggregateRootType: {}", aggregateRootId, aggregateRootTypeName, throwable);
throw new EnodeRuntimeException(throwable);
});
}, "QueryAggregateEventsAsync");
}
private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(
final MongoCollection<Document> snapshotStore,
final String start,
final int batchSize,
final String... snapshotFields) {
final List<Bson> pipeline = new ArrayList<>(5);
// optional match stage
if (!start.isEmpty()) {
pipeline.add(Aggregates.match(Filters.gt(PROCESSOR_ID, start)));
}
// sort stage
pipeline.add(Aggregates.sort(Sorts.orderBy(Sorts.ascending(PROCESSOR_ID), Sorts.descending(SN))));
// limit stage. It should come before group stage or MongoDB would scan the entire journal collection.
pipeline.add(Aggregates.limit(batchSize));
// group stage 1: by PID
pipeline.add(Aggregates.group("$" + PROCESSOR_ID, asFirstSnapshotBsonFields(snapshotFields)));
// sort stage 2 -- order after group stage is not defined
pipeline.add(Aggregates.sort(Sorts.ascending(ID)));
// group stage 2: filter out pids whose latest snapshot is a deleted snapshot, but retain max encountered pid
final String maxPid = "m";
final String items = "i";
pipeline.add(Aggregates.group(null,
Accumulators.max(maxPid, "$" + ID),
Accumulators.push(
items,
new Document().append("$cond", new Document()
.append("if", new Document().append("$ne", Arrays.asList("$" + LIFECYCLE, "DELETED")))
.append("then", "$$CURRENT")
.append("else", null)
))
));
// remove null entries by projection
pipeline.add(Aggregates.project(new Document()
.append(maxPid, 1)
.append(items, new Document()
.append("$setDifference", Arrays.asList("$" + items, Collections.singletonList(null)))
)
));
return Source.fromPublisher(snapshotStore.aggregate(pipeline))
.flatMapConcat(document -> {
final String theMaxPid = document.getString(maxPid);
if (theMaxPid == null) {
return Source.empty();
} else {
return Source.single(new SnapshotBatch(theMaxPid, document.getList(items, Document.class)));
}
});
}
@Test
public void givenCountryCollection_whenCountedRegionWise_thenMaxInAfrica() {
Document maxCountriedRegion = collection.aggregate(Arrays.asList(group("$region", Accumulators.sum("tally", 1)), sort(Sorts.descending("tally"))))
.first();
assertTrue(maxCountriedRegion.containsValue("Africa"));
}
@Test
public void givenCountryCollection_whenAreaSortedDescending_thenSuccess() {
collection.aggregate(Arrays.asList(sort(Sorts.descending("area")), limit(7), out("largest_seven")))
.toCollection();
MongoCollection<Document> largestSeven = database.getCollection("largest_seven");
assertEquals(7, largestSeven.countDocuments());
Document usa = largestSeven.find(Filters.eq("alpha3Code", "USA"))
.first();
assertNotNull(usa);
}