下面列出了com.mongodb.DBCollection#getIndexInfo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public MongoExecutingJobQueue(Config config) {
super(config);
// table name (Collection name) for single table
setTableName(JobQueueUtils.EXECUTING_JOB_QUEUE);
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_jobId", "jobId", true, true);
template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
template.ensureIndex("idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
template.ensureIndex("idx_jobType", "jobType");
template.ensureIndex("idx_taskTrackerIdentity", "taskTrackerIdentity");
template.ensureIndex("idx_gmtCreated", "gmtCreated");
}
}
public MongoCronJobQueue(Config config) {
super(config);
// table name (Collection name) for single table
setTableName(JobQueueUtils.CRON_JOB_QUEUE);
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_jobId", "jobId", true, true);
template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
template.ensureIndex("idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
template.ensureIndex("idx_relyOnPrevCycle_lgtt", "relyOnPrevCycle, lastGenerateTriggerTime");
}
}
public MongoRepeatJobQueue(Config config) {
super(config);
// table name (Collection name) for single table
setTableName(JobQueueUtils.REPEAT_JOB_QUEUE);
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_jobId", "jobId", true, true);
template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
template.ensureIndex("idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
template.ensureIndex("idx_relyOnPrevCycle_lgtt", "relyOnPrevCycle, lastGenerateTriggerTime");
}
}
@Override
public boolean createQueue(String taskTrackerNodeGroup) {
String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
DBCollection dbCollection = template.getCollection(tableName);
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex(tableName, "idx_jobId", "jobId", true, true);
template.ensureIndex(tableName, "idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
template.ensureIndex(tableName, "idx_taskTrackerIdentity", "taskTrackerIdentity");
template.ensureIndex(tableName, "idx_jobType", "jobType");
template.ensureIndex(tableName, "idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
template.ensureIndex(tableName, "idx_priority_triggerTime_gmtCreated", "priority,triggerTime,gmtCreated");
template.ensureIndex(tableName, "idx_isRunning", "isRunning");
LOGGER.info("create queue " + tableName);
}
EXIST_TABLE.add(tableName);
return true;
}
public MongoSuspendJobQueue(Config config) {
super(config);
// table name (Collection name) for single table
setTableName(JobQueueUtils.SUSPEND_JOB_QUEUE);
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_jobId", "jobId", true, true);
template.ensureIndex("idx_jobType", "jobType");
template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
template.ensureIndex("idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
}
}
protected static Set<MongoIndex> loadIndexInfoFromDB(DB database) {
Set<MongoIndex> dbIndexes = new HashSet<MongoIndex>();
Set<String> collectionNames = database.getCollectionNames();
for (String collectionName : collectionNames) {
DBCollection collection = database.getCollection(collectionName);
List<DBObject> indexList = collection.getIndexInfo();
for (DBObject dbObject : indexList) {
DBObject keyObj = (DBObject) dbObject.get("key");
Object uniqueField = dbObject.get("unique");
Object sparseField = dbObject.get("sparse");
boolean unique = false;
boolean sparse = false;
if (sparseField != null) {
sparse = Boolean.parseBoolean(sparseField.toString());
}
if (uniqueField != null) {
unique = Boolean.parseBoolean(uniqueField.toString());
}
dbIndexes.add(new MongoIndex(collectionName, unique, keyObj, sparse));
}
}
return dbIndexes;
}
public MongoJobLogger(Config config) {
super(config);
setTableName("lts_job_log_po");
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_logTime", "logTime");
template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId,taskTrackerNodeGroup");
template.ensureIndex("idx_realTaskId_taskTrackerNodeGroup", "realTaskId, taskTrackerNodeGroup");
}
}
public MongoNodeGroupStore(Config config) {
super(config);
setTableName(JobQueueUtils.NODE_GROUP_STORE);
// create table
DBCollection dbCollection = template.getCollection();
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex("idx_nodeType_name", "nodeType,name", true, true);
}
}
@Override
public boolean createQueue(String jobClientNodeGroup) {
String tableName = JobQueueUtils.getFeedbackQueueName(jobClientNodeGroup);
DBCollection dbCollection = template.getCollection(tableName);
List<DBObject> indexInfo = dbCollection.getIndexInfo();
// create index if not exist
if (CollectionUtils.sizeOf(indexInfo) <= 1) {
template.ensureIndex(tableName, "idx_gmtCreated", "gmtCreated");
LOGGER.info("create queue " + tableName);
}
return true;
}
@Test
public void shouldIndexAllCollections() throws Exception {
String feedName = "feed1";
String channelName = "channel1";
String configPath = "plugin_config/indexing_task_all_channels.json";
Channel channel = getConfiguredChannel(configPath, feedName, channelName);
pushDataToChannel(channel, "v", 5000, 1, TimeUnit.SECONDS);
// Wait for the task to complete
Thread.sleep(2000);
// Get the collections for the feed
DB feedDB = this.testClient.getDB(feedName);
Set<String> collNames = feedDB.getCollectionNames();
assertEquals("Should have 5 data collections + system.indexes", 6, collNames.size());
for(String collName : collNames){
if(collName.equals("system.indexes") == false){
DBCollection coll = feedDB.getCollection(collName);
List<DBObject> indexes = coll.getIndexInfo();
assertEquals("Should have _id index plus one additional", 2, indexes.size());
assertEquals("Should have data.v_1 index", indexes.get(1).get("name"), "data.v_1");
}
}
}
@Test
public void shouldIndexSomeCollections() throws Exception {
String feedName = "feed2";
String channelName = "channel1";
String configPath = "plugin_config/indexing_task_skip_channels.json";
Channel channel = getConfiguredChannel(configPath, feedName, channelName);
pushDataToChannel(channel, "v", 5000, 1, TimeUnit.SECONDS);
// Wait for the task to complete
Thread.sleep(2000);
// Get the collections for the feed
DB feedDB = this.testClient.getDB(feedName);
Set<String> collNames = feedDB.getCollectionNames();
assertEquals("Should have 5 data collections + system.indexes", 6, collNames.size());
int indexedCount = 0;
for(String collName : collNames){
if(collName.equals("system.indexes") == false){
DBCollection coll = feedDB.getCollection(collName);
List<DBObject> indexes = coll.getIndexInfo();
if(indexes.size() == 2){
assertEquals("Should have data.v_1 index", indexes.get(1).get("name"), "data.v_1");
indexedCount++;
}
}
}
assertEquals("Should 3 indexed collections", 3, indexedCount);
}