下面列出了com.mongodb.client.MongoCursor#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public AclEntity getAcl(String topic, String username, String clientId) {
FindIterable<Document> findIterable = aclCollection.find(eq("username", username));
MongoCursor<Document> cursor = findIterable.iterator();
AclEntity acl = null;
if(cursor.hasNext()) {
Document document = cursor.next();
acl = new AclEntity();
acl.username = username;
acl.clientId = clientId;
acl.topic = topic;
acl.canPublish = (document.getInteger("write") == 1);
acl.canSubscribe = (document.getInteger("read") == 1);
}
cursor.close();
return acl;
}
@Override
long executeQuery(int threadId, long threadRunCount, long globalRunCount, long selectorId, long randomId){
final MongoCursor<Document> cursor = mongoCollection.find(eq(queriedField, selectorId)).iterator();
//final MongoCursor<Document> cursor = mongoCollection.find(in(queriedField, selectorId, selectorId+1, selectorId+2, selectorId+3, selectorId+4)).iterator();
long result = 0;
try {
while (cursor.hasNext()) {
final Document doc = cursor.next();
LOG.debug("Document {}", doc.toJson());
result++;
}
} finally {
cursor.close();
}
return result;
}
/**
* @return
*/
public List<Park> getAll() {
System.out.println("[DEBUG] MongoDBConnection.getAll()");
ArrayList<Park> allParksList = new ArrayList<Park>();
if (mongoDB != null) {
try {
MongoCollection parks = mongoDB.getCollection(COLLECTION);
MongoCursor<Document> cursor = parks.find().iterator();
try {
while (cursor.hasNext()) {
allParksList.add(ParkReadConverter.convert(cursor.next()));
}
} finally {
cursor.close();
}
} catch (Exception e) {
System.out.println("[ERROR] Error connecting to MongoDB. " + e.getMessage());
}
} else {
System.out.println("[ERROR] mongoDB could not be initiallized. No operation with DB will be performed");
}
return allParksList;
}
/**
* @param query
* @return
*/
public List<Park> getByQuery(BasicDBObject query) {
System.out.println("[DEBUG] MongoDBConnection.getByQuery()");
List<Park> parks = new ArrayList<Park>();
if (mongoDB != null) {
try {
MongoCursor<Document> cursor = mongoDB.getCollection(COLLECTION).find(query).iterator();
int i = 0;
try {
while (cursor.hasNext()) {
parks.add(ParkReadConverter.convert(cursor.next()));
}
} finally {
cursor.close();
}
} catch (Exception e) {
System.out.println("[ERROR] Error connecting to MongoDB. " + e.getMessage());
}
} else {
System.out.println("[ERROR] mongoDB could not be initiallized. No operation with DB will be performed");
}
return parks;
}
@Override
public List<OldNoteInfo> list(AuthenticationInfo subject) throws IOException {
syncId();
List<OldNoteInfo> infos = new LinkedList<>();
MongoCursor<Document> cursor = coll.find().iterator();
while (cursor.hasNext()) {
Document doc = cursor.next();
Note note = documentToNote(doc);
OldNoteInfo info = new OldNoteInfo(note);
infos.add(info);
}
cursor.close();
return infos;
}
public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = false;
if (it.hasNext()) {
GridFSFile file = (GridFSFile)it.next();
Document metadata = file.getMetadata();
if (metadata != null && metadata.size() == attrs.size()) {
retVal = true;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object val = attrs.get(entry.getKey());
if (val == null || !entry.getValue().equals(val)) {
retVal = false;
break;
}
}
}
}
it.close();
return retVal;
}
@Before
public void setUp() {
mongoContainer.initializeReplicaSet();
mongo = mongoContainer.newMongoClient();
// workaround to obtain a timestamp before starting the test
// If you pass a timestamp which is not in the oplog, mongodb throws exception
MongoCollection<Document> collection = collection("START_AT_OPERATION");
MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
collection.insertOne(new Document("key", "val"));
startAtOperationTime = cursor.next().getClusterTime();
cursor.close();
jet = createJetMember();
}
@Override
public String getSecret(String username) {
FindIterable<Document> findIterable = userCollection.find(eq("username", username)).limit(1);
MongoCursor<Document> cursor = findIterable.iterator();
String secret = null;
if(cursor.hasNext())
secret = cursor.next().getString("password");
cursor.close();
return secret;
}
public List<Park> getWithin(float lat1, float lon1, float lat2, float lon2) {
System.out.println("[DEBUG] MongoDBConnection.getAll()");
ArrayList<Park> allParksList = new ArrayList<Park>();
if (mongoDB != null) {
try {
MongoCollection parks = mongoDB.getCollection(COLLECTION);
// make the query object
BasicDBObject spatialQuery = new BasicDBObject();
ArrayList<double[]> boxList = new ArrayList<double[]>();
boxList.add(new double[]{new Float(lat2), new Float(lon2)});
boxList.add(new double[]{new Float(lat1), new Float(lon1)});
BasicDBObject boxQuery = new BasicDBObject();
boxQuery.put("$box", boxList);
spatialQuery.put("pos", new BasicDBObject("$within", boxQuery));
System.out.println("Using spatial query: " + spatialQuery.toString());
MongoCursor<Document> cursor = parks.find(spatialQuery).iterator();
try {
while (cursor.hasNext()) {
allParksList.add(ParkReadConverter.convert(cursor.next()));
}
} finally {
cursor.close();
}
} catch (Exception e) {
System.out.println("[ERROR] Error connecting to MongoDB. " + e.getMessage());
}
} else {
System.out.println("[ERROR] mongoDB could not be initiallized. No operation with DB will be performed");
}
return allParksList;
}
@Override
public void execute() {
// 获取过滤条件
Document filterDoc = getFilterDoc();
if (logger.isDebugEnabled()) {
logger.debug("timestamp :{}", startTime);
logger.debug("filter cause :{}", filterDoc);
}
// 查找数据
Document document = null;
MongoCursor<Document> cursor = null;
try {
cursor = coll.find(filterDoc).iterator();
while (cursor.hasNext()) {
document = cursor.next();
handleBinary(document);
process(document);
}
if (document != null) {
DateTime dateTime = getLastDateTime(document.get(since_column));
startTime.updateIfNull(dateTime);
}
// 更新起始时间
startTime.add(getNextDuration());
} finally {
if (cursor != null) {
cursor.close();
}
}
}
/**
* Find documents of which type of _id is object ID, and change it to note ID.
* Since updating _id field is not allowed, remove original documents and insert
* new ones with string _id(note ID)
*/
private void syncId() {
// find documents whose id type is object id
MongoCursor<Document> cursor = coll.find(type("_id", BsonType.OBJECT_ID)).iterator();
// if there is no such document, exit
if (!cursor.hasNext())
return;
List<ObjectId> oldDocIds = new LinkedList<>(); // document ids need to update
List<Document> updatedDocs = new LinkedList<>(); // new documents to be inserted
while (cursor.hasNext()) {
Document doc = cursor.next();
// store original _id
ObjectId oldId = doc.getObjectId("_id");
oldDocIds.add(oldId);
// store the document with string _id (note id)
String noteId = doc.getString("id");
doc.put("_id", noteId);
updatedDocs.add(doc);
}
coll.insertMany(updatedDocs);
coll.deleteMany(in("_id", oldDocIds));
cursor.close();
}
private void removeCompletedCursors(final List<MongoCursor<Document>> cursorList,
final List<CanvasMatches> matchesList) {
MongoCursor<Document> cursor;
for (int i = matchesList.size() - 1; i >=0; i--) {
if (matchesList.get(i) == null) {
matchesList.remove(i);
cursor = cursorList.remove(i);
cursor.close();
}
}
}
/**
* Converts a query cursor to a list.
*
* @param cursor
* the query cursor.
* @return a list of documents.
*/
public List<Document> fromCursorToList(MongoCursor<Document> cursor) {
List<Document> list = new ArrayList<Document>();
while (cursor.hasNext()) {
list.add(cursor.next());
}
cursor.close();
return list;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String deleteQuery = getQuery(context, input);
final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
GridFSBucket bucket = getBucket(input, context);
try {
Document query = Document.parse(deleteQuery);
MongoCursor cursor = bucket.find(query).iterator();
if (cursor.hasNext()) {
GridFSFile file = (GridFSFile)cursor.next();
bucket.delete(file.getObjectId());
if (!StringUtils.isEmpty(queryAttribute)) {
input = session.putAttribute(input, queryAttribute, deleteQuery);
}
session.transfer(input, REL_SUCCESS);
} else {
getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
session.transfer(input, REL_FAILURE);
}
cursor.close();
} catch (Exception ex) {
getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
session.transfer(input, REL_FAILURE);
}
}
public boolean fileExists(String name, String bucketName) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = it.hasNext();
it.close();
return retVal;
}
private Document findOne(Document query, Document projection) {
MongoCollection col = controllerService.getDatabase(databaseName).getCollection(collection);
MongoCursor<Document> it = (projection != null ? col.find(query).projection(projection) : col.find(query)).iterator();
Document retVal = it.hasNext() ? it.next() : null;
it.close();
return retVal;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null;
final MongoCollection<Document> collection = getCollection(context);
try {
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
if (projection != null) {
it.projection(projection);
}
if (sort != null) {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).asInteger());
}
final MongoCursor<Document> cursor = it.iterator();
try {
FlowFile flowFile = null;
while (cursor.hasNext()) {
flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
IOUtils.write(cursor.next().toJson(), out);
}
});
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
session.commit();
} finally {
cursor.close();
}
} catch (final RuntimeException e) {
context.yield();
session.rollback();
logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
}
}
@Test
public void testWithOnlyCmdFilter() throws Exception {
MongoDBOplogSource source = new MongoDBOplogSourceBuilder()
.connectionString("mongodb://" + mongoContainerIp + ":" + mongoContainerMappedPort)
//Skip old oplogs and just start with whatever this test is producing
.initialTs(getInitialTsFromCurrentTime())
.initialOrdinal(0)
.collection(OPLOG_COLLECTION)
//Just filter update oplogs
.filterOlogOpTypeFilter(Collections.singletonList(OplogOpType.CMD))
.initialTs(initialTs)
.initialOrdinal(0)
.build();
SourceRunner runner = new SourceRunner.Builder(MongoDBSource.class, source)
.addOutputLane(LANE)
.build();
MongoCursor<Document> cursor =
mongoCursorFindIterable
.filter(
Filters.and(
Filters.gt(MongoDBOplogSource.TIMESTAMP_FIELD, new BsonTimestamp(initialTs, 0)),
Filters.eq(MongoDBOplogSource.OP_TYPE_FIELD, OplogOpType.CMD.getOp())
)
)
.iterator();
runner.runInit();
String offset = "";
List<Record> records;
try {
//insert some testDocuments in collection1
Document document1 = new Document("c", 1);
Document document2 = new Document("c", 2);
Document document3 = new Document("c", 3);
Document document4 = new Document("c", 4);
Document document5 = new Document("c", 5);
Document document6 = new Document("c", 6);
Document document7 = new Document("c", 7);
testDocuments.insertMany(Arrays.asList(document1, document2, document3, document4, document5, document6, document7));
//Delete two records
DeleteResult deleteResult = testDocuments.deleteMany(Filters.gt("c", 5));
Assert.assertEquals(2, deleteResult.getDeletedCount());
//Update by Incrementing the field "c" by 1 for testDocuments 1, 2 and 3
UpdateResult updateResult = testDocuments.updateMany(Filters.lt("c", 4), new Document("$inc", new Document("c", 1)));
Assert.assertEquals(3, updateResult.getModifiedCount());
//Now create bunch of collections, these are the only records we should see.
int numberOfCollectionsToCreate = 5;
for (int i = 0; i < numberOfCollectionsToCreate; i++) {
mongoClient.getDatabase(DATABASE).createCollection(testCollectionName + "_" + i);
}
Pair<String, List<Record>> runOp = runSourceAndGetOp(runner, offset);
records = runOp.getRight();
//Only testDocuments with "CMD" op should be selected
Assert.assertEquals(5, records.size());
for (Record record : records) {
checkRecord(record, cursor.tryNext());
checkRecordForFields(record, OplogOpType.CMD.getOp(), DATABASE + ".$cmd");
}
} finally {
runner.runDestroy();
cursor.close();
}
}
private void writeMergedMatches(final List<MongoCollection<Document>> collectionList,
final Document query,
final Document projection,
final OutputStream outputStream)
throws IOException {
// exclude mongo id from results
final ProcessTimer timer = new ProcessTimer();
outputStream.write(OPEN_BRACKET);
int count = 0;
final int numberOfCollections = collectionList.size();
final List<MongoCursor<Document>> cursorList = new ArrayList<>(numberOfCollections);
final List<CanvasMatches> matchesList = new ArrayList<>(numberOfCollections);
try {
int numberOfCompletedCursors = 0;
MongoCollection<Document> collection;
for (int i = 0; i < numberOfCollections; i++) {
collection = collectionList.get(i);
cursorList.add(collection.find(query).projection(projection).sort(MATCH_ORDER_BY).iterator());
matchesList.add(null);
numberOfCompletedCursors += updateMatches(cursorList, matchesList, i);
}
if (numberOfCompletedCursors > 0) {
removeCompletedCursors(cursorList, matchesList);
}
CanvasMatches mergedMatches;
while (matchesList.size() > 0) {
if (count > 0) {
outputStream.write(COMMA_WITH_NEW_LINE);
}
mergedMatches = getNextMergedMatches(cursorList, matchesList);
outputStream.write(mergedMatches.toJson().getBytes());
count++;
if (timer.hasIntervalPassed()) {
LOG.debug("writeMergedMatches: data written for {} matches", count);
}
}
} finally {
for (final MongoCursor<Document> cursor : cursorList) {
if (cursor != null) {
try {
cursor.close();
} catch (final Throwable t) {
LOG.error("failed to close cursor, ignoring exception", t);
}
}
}
}
outputStream.write(CLOSE_BRACKET);
if (LOG.isDebugEnabled()) {
final StringBuilder collectionNames = new StringBuilder(512);
for (int i = 0; i < collectionList.size(); i++) {
if (i > 0) {
collectionNames.append('|');
}
collectionNames.append(MongoUtil.fullName(collectionList.get(i)));
}
LOG.debug("writeMergedMatches: wrote data for {} matches returned by {}.find({},{}).sort({}), elapsedSeconds={}",
count, collectionNames, query.toJson(), projection.toJson(), MATCH_ORDER_BY_JSON, timer.getElapsedSeconds());
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = null;
if (context.hasIncomingConnection()) {
flowFile = session.get();
if (flowFile == null && context.hasNonLoopConnection()) {
return;
}
}
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
final Boolean allowDiskUse = context.getProperty(ALLOW_DISK_USE).asBoolean();
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
configureMapper(jsonTypeSetting, dateFormat);
Map<String, String> attrs = new HashMap<>();
if (queryAttr != null && queryAttr.trim().length() > 0) {
attrs.put(queryAttr, query);
}
MongoCursor<Document> iter = null;
try {
MongoCollection<Document> collection = getCollection(context, flowFile);
List<Bson> aggQuery = buildAggregationQuery(query);
AggregateIterable<Document> it = collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);;
it.batchSize(batchSize != null ? batchSize : 1);
iter = it.iterator();
List<Document> batch = new ArrayList<>();
Boolean doneSomething = false;
while (iter.hasNext()) {
batch.add(iter.next());
if (batch.size() == resultsPerFlowfile) {
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
batch = new ArrayList<>();
doneSomething |= true;
}
}
if (! batch.isEmpty()) {
// Something remains in batch list, write it to RESULT
writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS);
} else if (! doneSomething) {
// The batch list is empty and no batch was written (empty result!), so write empty string to RESULT
writeBatch("", flowFile, context, session, attrs, REL_RESULTS);
}
if (flowFile != null) {
session.transfer(flowFile, REL_ORIGINAL);
}
} catch (Exception e) {
getLogger().error("Error running MongoDB aggregation query.", e);
if (flowFile != null) {
session.transfer(flowFile, REL_FAILURE);
}
} finally {
if (iter != null) {
iter.close();
}
}
}