下面列出了怎么用com.mongodb.Block的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 查询每个shard 的主库地址
*/
private Map<String, String> getShardedUrls(MongoManager mongoManager, String username, String password) {
FindIterable<Document> documents = mongoManager.find("config", "shards");
Map<String, String> shardsUrlMap = new HashMap<>();
documents.forEach(new Block<Document>() {
@Override
public void apply(Document document) {
//格式:shard1/localhost:27017,localhost:27018,localhost:27019
String[] shardHosts = document.getString("host").split("/");
String shardName = shardHosts[0];
String url = shardHosts[1];
shardsUrlMap.put(shardName, url);
}
});
return shardsUrlMap;
}
NamespaceSynchronizationConfig(
final MongoCollection<NamespaceSynchronizationConfig> namespacesColl,
final MongoCollection<CoreDocumentSynchronizationConfig> docsColl,
final MongoNamespace namespace
) {
this.namespacesColl = namespacesColl;
this.docsColl = docsColl;
this.namespace = namespace;
this.syncedDocuments = new ConcurrentHashMap<>();
this.nsLock = new ReentrantReadWriteLock();
// Fill from db
final BsonDocument docsFilter = new BsonDocument();
docsFilter.put(
CoreDocumentSynchronizationConfig.ConfigCodec.Fields.NAMESPACE_FIELD,
new BsonString(namespace.toString()));
docsColl.find(docsFilter, CoreDocumentSynchronizationConfig.class)
.forEach(new Block<CoreDocumentSynchronizationConfig>() {
@Override
public void apply(@Nonnull final CoreDocumentSynchronizationConfig docConfig) {
syncedDocuments.put(docConfig.getDocumentId(), new CoreDocumentSynchronizationConfig(
docsColl,
docConfig));
}
});
}
public cfData execute( cfSession _session, cfArgStructData argStruct ) throws cfmRunTimeException {
MongoClient client = getMongoClient( _session, argStruct );
try {
cfArrayData arr = cfArrayData.createArray( 1 );
client.listDatabaseNames().forEach( new Block<String>() {
@Override
public void apply( final String st ) {
try {
arr.addElement( new cfStringData( st ) );
} catch ( cfmRunTimeException e ) {}
}
} );
return arr;
} catch ( MongoException me ) {
throwException( _session, me.getMessage() );
return null;
}
}
@Test
public void queryAll() {
// @begin: query-all
// @code: start
FindIterable<Document> iterable = db.getCollection("restaurants").find();
// @code: end
// @pre: Iterate the results and apply a block to each resulting document.
// @code: start
iterable.forEach(new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
});
// @code: end
// @end: query-all
}
@Test
public void logicalOr() {
// @begin: logical-or
// @code: start
FindIterable<Document> iterable = db.getCollection("restaurants").find(new Document("$or",
asList(new Document("cuisine", "Italian"), new Document("address.zipcode", "10075"))));
// @code: end
// @pre: Iterate the results and apply a block to each resulting document.
// @code: start
iterable.forEach(new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document);
}
});
// @code: end
// @pre: To simplify building queries the Java driver provides static helpers
// @code: start
db.getCollection("restaurants").find(or(eq("cuisine", "Italian"), eq("address.zipcode", "10075")));
// @code: end
// @end: logical-or
}
@Test
public void queryEmbeddedDocument() {
// @begin: query-embedded-document
// @code: start
FindIterable<Document> iterable = db.getCollection("restaurants")
.find(new Document("address.zipcode", "10075"));
// @code: end
// @pre: Iterate the results and apply a block to each resulting document.
// @code: start
iterable.forEach(new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document);
}
});
// @code: end
// @pre: To simplify building queries the Java driver provides static helpers
// @code: start
db.getCollection("restaurants").find(eq("address.zipcode", "10075"));
// @code: end
// @end: query-embedded-document
}
@Test
public void queryFieldInArray() {
// @begin: query-field-in-array
// @code: start
FindIterable<Document> iterable = db.getCollection("restaurants").find(new Document("grades.grade", "B"));
// @code: end
// @pre: Iterate the results and apply a block to each resulting document.
// @code: start
iterable.forEach(new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document);
}
});
// @code: end
// @pre: To simplify building queries the Java driver provides static helpers
// @code: start
db.getCollection("restaurants").find(eq("grades.grade", "B"));
// @code: end
// @end: query-field-in-array
}
public cfData execute(cfSession _session, cfArgStructData argStruct ) throws cfmRunTimeException {
MongoClient client = getMongoClient( _session, argStruct );
try{
cfArrayData arr = cfArrayData.createArray(1);
client.listDatabaseNames().forEach( new Block<String>(){
@Override
public void apply( String dbName ) {
try {
arr.addElement( new cfStringData( dbName ) );
} catch ( cfmRunTimeException e ) {}
}
});
return arr;
} catch (MongoException me){
throwException(_session, me.getMessage());
return null;
}
}
@Override
public Publisher<Success> rename(final ObjectId id, final String newFilename) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.rename(id, newFilename, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> drop() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<GridFSFile> first() {
return new ObservableToPublisher<GridFSFile>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<GridFSFile>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<GridFSFile> callback) {
wrapped.first(callback);
}
}));
}
@Override
public ResponseEntity<List> getGroupedOnlineData() {
List<Map<String, Object>> group = new ArrayList<>();
Block<Document> printBlock = m -> {
Map<String, Object> map = new HashMap<>(m);
group.add(map);
};
mongoClient.getDatabase("biliob").
getCollection("site_info").
aggregate(Arrays.asList(group(eq("$month", "$datetime"), max("datetime", "$datetime"), max("play_online", "$play_online"), max("web_online", "$web_online"), max("all_count", "$all_count")), sort(descending("datetime")), limit(12))).forEach(printBlock);
return new ResponseEntity<>(group, HttpStatus.OK);
}
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}));
}
@Override
public Publisher<ObjectId> uploadFromStream(final ClientSession clientSession, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<ObjectId>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<ObjectId>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<ObjectId> callback) {
wrapped.uploadFromStream(clientSession.getWrapped(), filename, toCallbackAsyncInputStream(source), options,
callback);
}
}));
}
@Override
public Publisher<DeleteResult> deleteMany(final ClientSession clientSession, final Bson filter, final DeleteOptions options) {
return new ObservableToPublisher<DeleteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<DeleteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<DeleteResult> callback) {
wrapped.deleteMany(clientSession.getWrapped(), filter, options, callback);
}
}));
}
@Override
public <TResult> Publisher<TResult> runCommand(final Bson command, final ReadPreference readPreference,
final Class<TResult> clazz) {
return new ObservableToPublisher<TResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<TResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<TResult> callback) {
wrapped.runCommand(command, readPreference, clazz, callback);
}
}));
}
@Override
@Deprecated
public Publisher<UpdateResult> replaceOne(final Bson filter, final TDocument replacement, final UpdateOptions options) {
return new ObservableToPublisher<UpdateResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<UpdateResult>>() {
@SuppressWarnings("deprecation")
@Override
public void apply(final com.mongodb.async.SingleResultCallback<UpdateResult> callback) {
wrapped.replaceOne(filter, replacement, options, callback);
}
}));
}
@Override
public Publisher<Long> skip(final long bytesToSkip) {
return new ObservableToPublisher<Long>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Long>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Long> callback) {
wrapped.skip(bytesToSkip, callback);
}
}));
}
@Override
public Publisher<Success> rename(final ClientSession clientSession, final BsonValue id, final String newFilename) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.rename(clientSession.getWrapped(), id, newFilename, voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<DeleteResult> deleteOne(final Bson filter, final DeleteOptions options) {
return new ObservableToPublisher<DeleteResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<DeleteResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<DeleteResult> callback) {
wrapped.deleteOne(filter, options, callback);
}
}));
}
@Override
public Publisher<Success> drop(final ClientSession clientSession) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.drop(clientSession.getWrapped(), voidToSuccessCallback(callback));
}
}));
}
@Override
public Observable<Success> delete(final ObjectId id) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() {
@Override
public void apply(final SingleResultCallback<Success> callback) {
wrapped.delete(id, voidToSuccessCallback(callback));
}
}), observableAdapter);
}
public static void main(String[] args) {
Set<Integer> buildIds = new HashSet<>();
String dbCollectionUrl = args[0];
String dbName = args[1];
String collectionName = args[2];
MongoConnection mongoConnection = new MongoConnection(dbCollectionUrl, dbName);
MongoDatabase database = mongoConnection.getMongoDatabase();
MongoCollection collection = database.getCollection(collectionName);
Block<Document> block = new Block<Document>(){
@Override
public void apply(Document document) {
int buildId = document.getInteger("buildId");
ObjectId id = document.getObjectId("_id");
if (buildIds.contains(buildId)) {
collection.deleteOne(eq("_id", id));
counterDeleted++;
return;
} else {
buildIds.add(buildId);
counterKept++;
}
}
};
collection.find().sort(orderBy(descending("buildReproductionDate"))).forEach(
block
);
System.out.println(counterDeleted+" entries deleted and "+counterKept+" kept.");
}
public static void logicalAnd() {
// @begin: logical-and
// @code: start
// FindIterable<Document> iterable = db.getCollection("box_vgg_fea").find(new
// Document("domain", "shopbop")).limit(10);
FindIterable<Document> iterable = db.getCollection("box_vgg_fea").find().limit(10);
// @code: end
// @pre: Iterate the results and apply a block to each resulting document.
// @code: start
iterable.forEach(new Block<Document>() {
@Override
public void apply(final Document document) {
log.info(document.toJson());
}
});
// @code: end
// @pre: To simplify building queries the Java driver provides static helpers
// @code: start
// db.getCollection("restaurants").find(and(eq("cuisine", "Italian"),
// eq("address.zipcode",
// "10075")));
// @code: end
// @end: logical-and
}
@Override
public Publisher<Integer> write(final ByteBuffer src) {
return new ObservableToPublisher<Integer>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Integer>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Integer> callback) {
wrapped.write(src, callback);
}
}));
}
@Override
public Publisher<Success> toCollection() {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.toCollection(voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<UpdateResult> updateMany(final ClientSession clientSession, final Bson filter, final List<? extends Bson> update,
final UpdateOptions options) {
return new ObservableToPublisher<UpdateResult>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<UpdateResult>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<UpdateResult> callback) {
wrapped.updateMany(clientSession.getWrapped(), filter, update, options, callback);
}
}));
}
@Override
public Publisher<Long> downloadToStream(final ObjectId id,
final com.mongodb.reactivestreams.client.gridfs.AsyncOutputStream destination) {
return new ObservableToPublisher<Long>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Long>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Long> callback) {
wrapped.downloadToStream(id, toCallbackAsyncOutputStream(destination), callback);
}
}));
}
@Override
public Publisher<Success> createView(final ClientSession clientSession, final String viewName, final String viewOn,
final List<? extends Bson> pipeline, final CreateViewOptions createViewOptions) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.createView(clientSession.getWrapped(), viewName, viewOn, pipeline, createViewOptions,
voidToSuccessCallback(callback));
}
}));
}
@Override
public Publisher<Success> insertOne(final ClientSession clientSession, final TDocument document, final InsertOneOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.insertOne(clientSession.getWrapped(), document, options, voidToSuccessCallback(callback));
}
}));
}