下面列出了com.mongodb.client.MongoCursor#hasNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public List<Document> getDocuments(String keyName, String keyValue,
RESOURCE_TYPE resType, String sortKey, boolean asc, int limit) {
ArrayList<Document> docList = new ArrayList<Document>();
BasicDBObject query = new BasicDBObject(keyName, keyValue).append(
RESTYPE_KEY, resType.Value());
BasicDBObject sort = new BasicDBObject(sortKey, asc ? 1 : -1);
MongoCollection<Document> collection = context.getDatabaseManager()
.getCollection(collectionName);
MongoCursor<Document> cursor = collection.find(query).sort(sort)
.limit(limit).iterator();
while (cursor.hasNext()) {
docList.add(cursor.next());
}
return docList;
}
/**
* @return
*/
public List<Park> getAll() {
System.out.println("[DEBUG] MongoDBConnection.getAll()");
ArrayList<Park> allParksList = new ArrayList<Park>();
if (mongoDB != null) {
try {
MongoCollection parks = mongoDB.getCollection(COLLECTION);
MongoCursor<Document> cursor = parks.find().iterator();
try {
while (cursor.hasNext()) {
allParksList.add(ParkReadConverter.convert(cursor.next()));
}
} finally {
cursor.close();
}
} catch (Exception e) {
System.out.println("[ERROR] Error connecting to MongoDB. " + e.getMessage());
}
} else {
System.out.println("[ERROR] mongoDB could not be initiallized. No operation with DB will be performed");
}
return allParksList;
}
/**
* Geospatial query
*
* @param key should be indexed by 2dsphere
* db.vertices.createIndex({"urn:oliot:ubv:mda:gps" : "2dsphere"})
* @param lon
* @param lat
* @param radius in metres db.vertices.find({ "urn:oliot:ubv:mda:gps" : { $near
* : { $geometry: { type: "Point", coordinates: [ -1.1673,52.93]},
* $maxDistance: 50000}}})
*
* @return
*/
public Stream<ChronoVertex> getChronoVertexStream(String key, double lon, double lat, double radius) {
HashSet<ChronoVertex> ret = new HashSet<ChronoVertex>();
BsonArray coordinates = new BsonArray();
coordinates.add(new BsonDouble(lon));
coordinates.add(new BsonDouble(lat));
BsonDocument geometry = new BsonDocument();
geometry.put("type", new BsonString("Point"));
geometry.put("coordinates", coordinates);
BsonDocument near = new BsonDocument();
near.put("$geometry", geometry);
near.put("$maxDistance", new BsonDouble(radius));
BsonDocument geoquery = new BsonDocument();
geoquery.put("$near", near);
BsonDocument queryDoc = new BsonDocument();
queryDoc.put(key, geoquery);
MongoCursor<BsonDocument> cursor = vertices.find(queryDoc).projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoVertex(v.getString(Tokens.ID).getValue(), this));
}
return ret.parallelStream();
}
private SetMultimap<ReNounFact, ScoredPattern> getFactToScoredPatternMultimap(
MongoCollection<Document> factsCollection,
MongoCollection<Document> scoredPatternsCollection) {
SetMultimap<ReNounFact, ScoredPattern> factToPatternMap = HashMultimap.create();
MongoCursor<Document> scoredPatternsCursor = scoredPatternsCollection.find().iterator();
while (scoredPatternsCursor.hasNext()) {
Document scoredPatternDocument = scoredPatternsCursor.next();
Iterator<Document> factsMatchingScoredPatternIterator =
factsCollection
.find(eq(PATTERN_FACT_FIELD, scoredPatternDocument.get(PATTERN_FACT_FIELD)))
.iterator();
while (factsMatchingScoredPatternIterator.hasNext()) {
Document factMatchingScoredPattern = factsMatchingScoredPatternIterator.next();
ReNounFact fact = new ReNounFact(factMatchingScoredPattern);
ScoredPattern scoredPattern = new ScoredPattern(scoredPatternDocument);
factToPatternMap.put(fact, scoredPattern);
}
}
return factToPatternMap;
}
public void testDatabase() {
MongoClient mongoClient = mongo.getClient("mongo1");
ListDatabasesIterable<Document> list = mongoClient.listDatabases();
MongoCursor<Document> iterD = list.iterator();
while (iterD.hasNext()) {
Document doc = iterD.next();
System.out.println(doc);
if (!doc.getBoolean("empty", true)) {
System.out.println(mongoClient.getDatabase(doc
.getString("name")));
}
}
// MongoIterable<String> mongo = mongoClient.listDatabaseNames();
// MongoCursor<String> iter = mongo.iterator();
// while (iter.hasNext()) {
// System.out.println(iter.next());
// }
}
public void testDatabase() {
ListDatabasesIterable<Document> list = mongoClient.listDatabases();
MongoCursor<Document> iterD = list.iterator();
while (iterD.hasNext()) {
Document doc = iterD.next();
System.out.println(doc);
if (!doc.getBoolean("empty", true)) {
System.out.println(mongoClient.getDatabase(doc
.getString("name")));
}
}
// MongoIterable<String> mongo = mongoClient.listDatabaseNames();
// MongoCursor<String> iter = mongo.iterator();
// while (iter.hasNext()) {
// System.out.println(iter.next());
// }
}
public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = false;
if (it.hasNext()) {
GridFSFile file = (GridFSFile)it.next();
Document metadata = file.getMetadata();
if (metadata != null && metadata.size() == attrs.size()) {
retVal = true;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object val = attrs.get(entry.getKey());
if (val == null || !entry.getValue().equals(val)) {
retVal = false;
break;
}
}
}
}
it.close();
return retVal;
}
public static <T extends IEntity> List<T> toEntities(Class<T> entity, MongoIterable<Document> iterable) throws Exception {
MongoCursor<Document> _documentIt = iterable.iterator();
List<T> _resultSet = new ArrayList<T>();
while (_documentIt.hasNext()) {
_resultSet.add(toEntity(entity, _documentIt.next()));
}
return _resultSet;
}
/**
* Converts a query cursor to a list.
*
* @param cursor
* the query cursor.
* @return a list of documents.
*/
public List<Document> fromCursorToList(MongoCursor<Document> cursor) {
List<Document> list = new ArrayList<Document>();
while (cursor.hasNext()) {
list.add(cursor.next());
}
cursor.close();
return list;
}
/**
* Return an iterable to all the vertices in the graph. If this is not possible
* for the implementation, then an UnsupportedOperationException can be thrown.
*
* @return an iterable reference to all vertices in the graph
*/
public Iterable<ChronoVertex> getChronoVertices() {
HashSet<String> idSet = new HashSet<String>();
Function<BsonString, String> mapper = new Function<BsonString, String>() {
@Override
public String apply(BsonString val) {
return val.getValue();
}
};
HashSet<String> outV = new HashSet<String>();
edges.distinct(Tokens.OUT_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.OUT_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(outV);
idSet.addAll(outV);
HashSet<String> inV = new HashSet<String>();
edges.distinct(Tokens.IN_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.IN_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(inV);
idSet.addAll(inV);
MongoCursor<BsonDocument> vi = vertices.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED)
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (vi.hasNext()) {
BsonDocument d = vi.next();
idSet.add(d.getString(Tokens.ID).getValue());
}
HashSet<String> vertex = new HashSet<String>();
vertices.distinct(Tokens.VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(vertex);
idSet.addAll(vertex);
return idSet.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet());
}
private int updateMatches(final List<MongoCursor<Document>> cursorList,
final List<CanvasMatches> matchesList,
final int index) {
CanvasMatches canvasMatches = null;
final MongoCursor<Document> cursor = cursorList.get(index);
if (cursor.hasNext()) {
canvasMatches = CanvasMatches.fromJson(cursor.next().toJson());
}
matchesList.set(index, canvasMatches);
return (canvasMatches == null ? 1 : 0);
}
@Override
public void transactionMarker() throws InterruptedException {
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection<Document> collection = database.getCollection("test");
MongoCursor<Document> i = collection.find().iterator();
while (i.hasNext()) {
i.next();
}
}
/**
* Return an iterable to all the edges in the graph. If this is not possible for
* the implementation, then an UnsupportedOperationException can be thrown.
*
* @return an iterable reference to all edges in the graph
*/
public Iterable<ChronoEdge> getChronoEdges() {
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find().projection(Tokens.PRJ_ONLY_OUTV_LABEL_INV).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
String outV = v.getString(Tokens.OUT_VERTEX).getValue();
String label = v.getString(Tokens.LABEL).getValue();
String inV = v.getString(Tokens.IN_VERTEX).getValue();
String id = outV + "|" + label + "|" + inV;
ret.add(new ChronoEdge(id, outV, inV, label, this));
}
return ret;
}
/**
* Return an iterable to all the vertices in the graph that have a particular
* key/value property. If this is not possible for the implementation, then an
* UnsupportedOperationException can be thrown. The graph implementation should
* use indexing structures to make this efficient else a full vertex-filter scan
* is required.
*
* @param key the key of vertex
* @param value the value of the vertex
* @return an iterable of vertices with provided key and value
*/
public Set<ChronoVertex> getChronoVertexSet(String key, Object value) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoVertex> ret = new HashSet<ChronoVertex>();
MongoCursor<BsonDocument> cursor = vertices
.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoVertex(v.getString(Tokens.ID).getValue(), this));
}
return ret;
}
/**
* Return an iterable to all the edges in the graph that have a particular
* key/value property. If this is not possible for the implementation, then an
* UnsupportedOperationException can be thrown. The graph implementation should
* use indexing structures to make this efficient else a full edge-filter scan
* is required.
*
* @param key the key of the edge
* @param value the value of the edge
* @return an iterable of edges with provided key and value
*/
public Iterable<ChronoEdge> getChronoEdges(String key, Object value) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find(Tokens.FLT_EDGE_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoEdge(v.getString(Tokens.ID).getValue(), this));
}
return ret;
}
public ArrayList<CachedChronoVertex> getCachedChronoVertices(BsonArray filters, String sortKey, Boolean isDesc,
Integer limit) {
ArrayList<CachedChronoVertex> vList = new ArrayList<CachedChronoVertex>();
// Merge All the queries with $and
CachedChronoGraph g = new CachedChronoGraph();
BsonDocument baseQuery = new BsonDocument();
FindIterable<BsonDocument> cursor;
if (filters.isEmpty() == false) {
baseQuery.put("$and", filters);
cursor = vertices.find(baseQuery);
} else {
cursor = vertices.find();
}
if (sortKey != null) {
if (isDesc == null)
cursor.sort(new BsonDocument(sortKey, new BsonInt32(-1)));
else if (isDesc == true) {
cursor.sort(new BsonDocument(sortKey, new BsonInt32(-1)));
} else
cursor.sort(new BsonDocument(sortKey, new BsonInt32(1)));
}
if (limit != null)
cursor.limit(limit);
MongoCursor<BsonDocument> iter = cursor.iterator();
while (iter.hasNext()) {
BsonDocument doc = iter.next();
String vid = doc.remove("_id").asString().getValue();
CachedChronoVertex v = g.getChronoVertex(vid);
v.setProperties(doc);
vList.add(v);
}
return vList;
}
public List<SemanticDescriptor> getSemanticDescriptorWithParentId(String pi) throws OneM2MException {
ArrayList<SemanticDescriptor> semanticDescriptors = new ArrayList<SemanticDescriptor>();
BasicDBObject query = new BasicDBObject(PARENTID_KEY, pi).append(RESTYPE_KEY, RESOURCE_TYPE.SEMANTICDESCRIPTOR.Value());
MongoCollection<Document> collection = context.getDatabaseManager().getCollection(collectionName);
MongoCursor<Document> cursor = collection.find(query).iterator();
while(cursor.hasNext()) {
semanticDescriptors.add((SemanticDescriptor)createResourceWithDoc(cursor.next()));
}
return semanticDescriptors;
}
/**
* Scans DocumentDB using the scan settings set on the requested Split by DocDBeMetadataHandler.
*
* @see RecordHandler
*/
@Override
protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
{
TableName tableName = recordsRequest.getTableName();
Map<String, ValueSet> constraintSummary = recordsRequest.getConstraints().getSummary();
MongoClient client = getOrCreateConn(recordsRequest.getSplit());
MongoDatabase db = client.getDatabase(tableName.getSchemaName());
MongoCollection<Document> table = db.getCollection(tableName.getTableName());
Document query = QueryUtils.makeQuery(recordsRequest.getSchema(), constraintSummary);
Document output = QueryUtils.makeProjection(recordsRequest.getSchema());
logger.info("readWithConstraint: query[{}] projection[{}]", query, output);
final MongoCursor<Document> iterable = table
.find(query)
.projection(output)
.batchSize(MONGO_QUERY_BATCH_SIZE).iterator();
long numRows = 0;
AtomicLong numResultRows = new AtomicLong(0);
while (iterable.hasNext() && queryStatusChecker.isQueryRunning()) {
numRows++;
spiller.writeRows((Block block, int rowNum) -> {
Document doc = iterable.next();
boolean matched = true;
for (Field nextField : recordsRequest.getSchema().getFields()) {
Object value = TypeUtils.coerce(nextField, doc.get(nextField.getName()));
Types.MinorType fieldType = Types.getMinorTypeForArrowType(nextField.getType());
try {
switch (fieldType) {
case LIST:
case STRUCT:
matched &= block.offerComplexValue(nextField.getName(), rowNum, DEFAULT_FIELD_RESOLVER, value);
break;
default:
matched &= block.offerValue(nextField.getName(), rowNum, value);
break;
}
if (!matched) {
return 0;
}
}
catch (Exception ex) {
throw new RuntimeException("Error while processing field " + nextField.getName(), ex);
}
}
numResultRows.getAndIncrement();
return 1;
});
}
logger.info("readWithConstraint: numRows[{}] numResultRows[{}]", numRows, numResultRows.get());
}
private void configureMongos() throws Exception {
Document cr;
MongoClientSettings options = MongoClientSettings.builder()
.applyToSocketSettings(builder -> builder.connectTimeout(10, TimeUnit.SECONDS))
.applyToClusterSettings(builder -> builder.hosts(Collections.singletonList(toAddress(this.config.net()))))
.build();
try (MongoClient mongo = MongoClients.create(options)) {
MongoDatabase mongoAdminDB = mongo.getDatabase(ADMIN_DATABASE_NAME);
// Add shard from the replica set list
for (Entry<String, List<IMongodConfig>> entry : this.replicaSets
.entrySet()) {
String replicaName = entry.getKey();
String command = "";
for (IMongodConfig mongodConfig : entry.getValue()) {
if (command.isEmpty()) {
command = replicaName + "/";
} else {
command += ",";
}
command += mongodConfig.net().getServerAddress().getHostName()
+ ":" + mongodConfig.net().getPort();
}
logger.info("Execute add shard command: {}", command);
cr = mongoAdminDB.runCommand(new Document("addShard", command));
logger.info(cr.toString());
}
logger.info("Execute list shards.");
cr = mongoAdminDB.runCommand(new Document("listShards", 1));
logger.info(cr.toString());
// Enabled sharding at database level
logger.info("Enabled sharding at database level");
cr = mongoAdminDB.runCommand(new Document("enableSharding",
this.shardDatabase));
logger.info(cr.toString());
// Create index in sharded collection
logger.info("Create index in sharded collection");
MongoDatabase db = mongo.getDatabase(this.shardDatabase);
db.getCollection(this.shardCollection).createIndex(new Document(this.shardKey, 1));
// Shard the collection
logger.info("Shard the collection: {}.{}", this.shardDatabase, this.shardCollection);
Document cmd = new Document();
cmd.put("shardCollection", this.shardDatabase + "." + this.shardCollection);
cmd.put("key", new BasicDBObject(this.shardKey, 1));
cr = mongoAdminDB.runCommand(cmd);
logger.info(cr.toString());
logger.info("Get info from config/shards");
FindIterable<Document> cursor = mongo.getDatabase("config").getCollection("shards").find();
MongoCursor<Document> iterator = cursor.iterator();
while (iterator.hasNext()) {
Document item = iterator.next();
logger.info(item.toString());
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
final String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue();
final String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
final Document query = getQuery(context, session, input);
MongoCollection mongoCollection = clientService.getDatabase(database).getCollection(collection);
FindIterable<Document> find = mongoCollection.find(query);
if (context.getProperty(SORT).isSet()) {
find = find.sort(Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()));
}
if (context.getProperty(PROJECTION).isSet()) {
find = find.projection(Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()));
}
if (context.getProperty(LIMIT).isSet()) {
find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
}
MongoCursor<Document> cursor = find.iterator();
FlowFile output = input != null ? session.create(input) : session.create();
final FlowFile inputPtr = input;
try {
final Map<String, String> attributes = getAttributes(context, input, query, mongoCollection);
try (OutputStream out = session.write(output)) {
Map<String, String> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){{
put("schema.name", schemaName);
}};
RecordSchema schema = writerFactory.getSchema(attrs, null);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, attrs);
long count = 0L;
writer.beginRecordSet();
while (cursor.hasNext()) {
Document next = cursor.next();
if (next.get("_id") instanceof ObjectId) {
next.put("_id", next.get("_id").toString());
}
Record record = new MapRecord(schema, next);
writer.write(record);
count++;
}
writer.finishRecordSet();
writer.close();
out.close();
attributes.put("record.count", String.valueOf(count));
} catch (SchemaNotFoundException e) {
throw new RuntimeException(e);
}
output = session.putAllAttributes(output, attributes);
session.getProvenanceReporter().fetch(output, getURI(context));
session.transfer(output, REL_SUCCESS);
if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
} catch (Exception ex) {
ex.printStackTrace();
getLogger().error("Error writing record set from Mongo query.", ex);
session.remove(output);
if (input != null) {
session.transfer(input, REL_FAILURE);
}
}
}