下面列出了怎么用org.apache.lucene.index.ConcurrentMergeScheduler的API类实例代码及写法,或者点击链接到github查看源代码。
/** just tries to configure things to keep the open file
* count lowish */
public static void reduceOpenFiles(IndexWriter w) {
// keep number of open files lowish
MergePolicy mp = w.getConfig().getMergePolicy();
mp.setNoCFSRatio(1.0);
if (mp instanceof LogMergePolicy) {
LogMergePolicy lmp = (LogMergePolicy) mp;
lmp.setMergeFactor(Math.min(5, lmp.getMergeFactor()));
} else if (mp instanceof TieredMergePolicy) {
TieredMergePolicy tmp = (TieredMergePolicy) mp;
tmp.setMaxMergeAtOnce(Math.min(5, tmp.getMaxMergeAtOnce()));
tmp.setSegmentsPerTier(Math.min(5, tmp.getSegmentsPerTier()));
}
MergeScheduler ms = w.getConfig().getMergeScheduler();
if (ms instanceof ConcurrentMergeScheduler) {
// wtf... shouldnt it be even lower since it's 1 by default?!?!
((ConcurrentMergeScheduler) ms).setMaxMergesAndThreads(3, 2);
}
}
@Test
public void testTieredMPSolrIndexConfigCreation() throws Exception {
String solrConfigFileName = solrConfigFileNameTieredMergePolicyFactory;
SolrConfig solrConfig = new SolrConfig(instanceDir, solrConfigFileName);
SolrIndexConfig solrIndexConfig = new SolrIndexConfig(solrConfig, null, null);
IndexSchema indexSchema = IndexSchemaFactory.buildIndexSchema(schemaFileName, solrConfig);
h.getCore().setLatestSchema(indexSchema);
IndexWriterConfig iwc = solrIndexConfig.toIndexWriterConfig(h.getCore());
assertNotNull("null mp", iwc.getMergePolicy());
assertTrue("mp is not TieredMergePolicy", iwc.getMergePolicy() instanceof TieredMergePolicy);
TieredMergePolicy mp = (TieredMergePolicy) iwc.getMergePolicy();
assertEquals("mp.maxMergeAtOnceExplicit", 19, mp.getMaxMergeAtOnceExplicit());
assertEquals("mp.segmentsPerTier",9,(int)mp.getSegmentsPerTier());
assertNotNull("null ms", iwc.getMergeScheduler());
assertTrue("ms is not CMS", iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler);
ConcurrentMergeScheduler ms = (ConcurrentMergeScheduler) iwc.getMergeScheduler();
assertEquals("ms.maxMergeCount", 987, ms.getMaxMergeCount());
assertEquals("ms.maxThreadCount", 42, ms.getMaxThreadCount());
assertEquals("ms.isAutoIOThrottle", true, ms.getAutoIOThrottle());
}
@Test
public void testConcurrentMergeSchedularSolrIndexConfigCreation() throws Exception {
String solrConfigFileName = solrConfigFileNameConnMSPolicyFactory;
SolrConfig solrConfig = new SolrConfig(instanceDir, solrConfigFileName);
SolrIndexConfig solrIndexConfig = new SolrIndexConfig(solrConfig, null, null);
IndexSchema indexSchema = IndexSchemaFactory.buildIndexSchema(schemaFileName, solrConfig);
h.getCore().setLatestSchema(indexSchema);
IndexWriterConfig iwc = solrIndexConfig.toIndexWriterConfig(h.getCore());
assertNotNull("null mp", iwc.getMergePolicy());
assertTrue("mp is not TieredMergePolicy", iwc.getMergePolicy() instanceof TieredMergePolicy);
assertNotNull("null ms", iwc.getMergeScheduler());
assertTrue("ms is not CMS", iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler);
ConcurrentMergeScheduler ms = (ConcurrentMergeScheduler) iwc.getMergeScheduler();
assertEquals("ms.maxMergeCount", 987, ms.getMaxMergeCount());
assertEquals("ms.maxThreadCount", 42, ms.getMaxThreadCount());
assertEquals("ms.isAutoIOThrottle", false, ms.getAutoIOThrottle());
}
private IndexWriterCount createWriter(final File indexDirectory) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexDirectory);
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
final ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
final int mergeThreads = repoConfig.getConcurrentMergeThreads();
mergeScheduler.setMaxMergesAndThreads(mergeThreads, mergeThreads);
config.setMergeScheduler(mergeScheduler);
final IndexWriter indexWriter = new IndexWriter(directory, config);
final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, indexDirectory);
final IndexWriterCount writerCount = new IndexWriterCount(eventIndexWriter, analyzer, directory, 1, false);
logger.debug("Providing new index writer for {}", indexDirectory);
return writerCount;
} catch (final IOException ioe) {
for (final Closeable closeable : closeables) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
}
public IndexWriterConfig createIndexWriterConfig() throws IOException {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Version.LATEST, getIndexAnalyzerInstance());
indexWriterConfig.setRAMBufferSizeMB(48);
MergePolicy mergePolicy = getPluginManager().getInstance(LindenConfigBuilder.MERGE_POLICY, MergePolicy.class);
if (mergePolicy != null) {
indexWriterConfig.setMergePolicy(mergePolicy);
}
LOGGER.info("Merge policy : {}", mergePolicy == null ? "Default" : mergePolicy);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
cms.setMaxMergesAndThreads(8, 1);
indexWriterConfig.setMergeScheduler(cms);
return indexWriterConfig;
}
public void testReferenceDecrementIllegally() throws Exception {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
new MockAnalyzer(random())).setMergeScheduler(new ConcurrentMergeScheduler()));
@SuppressWarnings("resource")
SearcherManager sm = new SearcherManager(writer, false, false, new SearcherFactory());
writer.addDocument(new Document());
writer.commit();
sm.maybeRefreshBlocking();
IndexSearcher acquire = sm.acquire();
IndexSearcher acquire2 = sm.acquire();
sm.release(acquire);
sm.release(acquire2);
acquire = sm.acquire();
acquire.getIndexReader().decRef();
sm.release(acquire);
expectThrows(IllegalStateException.class, () -> {
sm.acquire();
});
// sm.close(); -- already closed
writer.close();
dir.close();
}
@SuppressWarnings({"unchecked"})
private MergeScheduler buildMergeScheduler(SolrResourceLoader resourceLoader) {
String msClassName = mergeSchedulerInfo == null ? SolrIndexConfig.DEFAULT_MERGE_SCHEDULER_CLASSNAME : mergeSchedulerInfo.className;
MergeScheduler scheduler = resourceLoader.newInstance(msClassName, MergeScheduler.class);
if (mergeSchedulerInfo != null) {
// LUCENE-5080: these two setters are removed, so we have to invoke setMaxMergesAndThreads
// if someone has them configured.
if (scheduler instanceof ConcurrentMergeScheduler) {
@SuppressWarnings({"rawtypes"})
NamedList args = mergeSchedulerInfo.initArgs.clone();
Integer maxMergeCount = (Integer) args.remove("maxMergeCount");
if (maxMergeCount == null) {
maxMergeCount = ((ConcurrentMergeScheduler) scheduler).getMaxMergeCount();
}
Integer maxThreadCount = (Integer) args.remove("maxThreadCount");
if (maxThreadCount == null) {
maxThreadCount = ((ConcurrentMergeScheduler) scheduler).getMaxThreadCount();
}
((ConcurrentMergeScheduler)scheduler).setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
Boolean ioThrottle = (Boolean) args.remove("ioThrottle");
if (ioThrottle != null && !ioThrottle) { //by-default 'enabled'
((ConcurrentMergeScheduler) scheduler).disableAutoIOThrottle();
}
SolrPluginUtils.invokeSetters(scheduler, args);
} else {
SolrPluginUtils.invokeSetters(scheduler, mergeSchedulerInfo.initArgs);
}
}
return scheduler;
}
private IndexWriterCount createWriter(final File indexDirectory) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexDirectory.toPath());
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(analyzer);
final ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
final int mergeThreads = repoConfig.getConcurrentMergeThreads();
mergeScheduler.setMaxMergesAndThreads(mergeThreads, mergeThreads);
config.setMergeScheduler(mergeScheduler);
final IndexWriter indexWriter = new IndexWriter(directory, config);
final EventIndexWriter eventIndexWriter = new LuceneEventIndexWriter(indexWriter, indexDirectory);
final IndexWriterCount writerCount = new IndexWriterCount(eventIndexWriter, analyzer, directory, 1, false);
logger.debug("Providing new index writer for {}", indexDirectory);
return writerCount;
} catch (final IOException ioe) {
for (final Closeable closeable : closeables) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
}
private static IndexWriterConfig createIndexConfig(Analyzer analyzer) {
IndexWriterConfig config = new IndexWriterConfig(analyzer);
config.setMergeScheduler(new ConcurrentMergeScheduler());
config.setCodec(new Lucene70CodecWithNoFieldCompression());
return config;
}
public static void syncConcurrentMerges(MergeScheduler ms) {
if (ms instanceof ConcurrentMergeScheduler)
((ConcurrentMergeScheduler) ms).sync();
}
@SuppressWarnings({"ConstantConditions", "PointlessBooleanExpression"})
@Nightly
public void testTriggerUnInvertLimit() throws IOException {
final boolean SHOULD_TRIGGER = false; // Set this to true to use the test with the old implementation
// Ensure enough terms inside of a single UnInvert-pass-structure to trigger the limit
final int REF_LIMIT = (int) Math.pow(2, 24); // Maximum number of references within a single pass-structure
final int DOCS = (1<<16)-1; // The number of documents within a single pass (simplified)
final int TERMS = REF_LIMIT/DOCS; // Each document must have this many references aka terms hit limit
// disk based Directory and IWC settings to reduce risk of OOM
Directory dir = newFSDirectory(createTempDir("TestDocTermOrdsUninvertLimit"));
final IndexWriter w = new IndexWriter(dir,
new IndexWriterConfig(new MockAnalyzer(random()))
.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH)
.setRAMBufferSizeMB(256.0)
.setMergeScheduler(new ConcurrentMergeScheduler())
.setMergePolicy(newLogMergePolicy(false, 10))
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setCodec(TestUtil.getDefaultCodec()));
Document doc = new Document();
Field field = newTextField("field", "", Field.Store.NO);
doc.add(field);
StringBuilder sb = new StringBuilder(TERMS*(Integer.toString(TERMS).length()+1));
for (int i = 0 ; i < TERMS ; i++) {
sb.append(" ").append(Integer.toString(i));
}
field.setStringValue(sb.toString());
for (int i = 0 ; i < DOCS ; i++) {
w.addDocument(doc);
}
//System.out.println("\n Finished adding " + DOCS + " documents of " + TERMS + " unique terms");
w.close();
final IndexReader r = DirectoryReader.open(dir);
try {
final LeafReader ar = SlowCompositeReaderWrapper.wrap(r);
TestUtil.checkReader(ar);
final DocTermOrds dto = new DocTermOrds(ar, ar.getLiveDocs(), "field"); // bigTerms turned off
if (SHOULD_TRIGGER) {
fail("DocTermOrds should have failed with a \"Too many values for UnInvertedField\" message");
}
} catch (IllegalStateException e) {
if (!SHOULD_TRIGGER) {
fail("DocTermsOrd should not have failed with this implementation, but got exception " +
e.getClass().getSimpleName() + " with message " + e.getMessage());
}
// This is (hopefully) "Too many values for UnInvertedField faceting on field field", so all is as expected
} finally {
r.close();
dir.close();
}
}
@Test
public void testDefaults() throws Exception {
int numDefaultsTested = 0;
int numNullDefaults = 0;
SolrConfig sc = new SolrConfig(TEST_PATH().resolve("collection1"), "solrconfig-defaults.xml");
SolrIndexConfig sic = sc.indexConfig;
++numDefaultsTested; assertEquals("default useCompoundFile", false, sic.useCompoundFile);
++numDefaultsTested; assertEquals("default maxBufferedDocs", -1, sic.maxBufferedDocs);
++numDefaultsTested; assertEquals("default ramBufferSizeMB", 100.0D, sic.ramBufferSizeMB, 0.0D);
++numDefaultsTested; assertEquals("default ramPerThreadHardLimitMB", -1, sic.ramPerThreadHardLimitMB);
++numDefaultsTested; assertEquals("default writeLockTimeout", -1, sic.writeLockTimeout);
++numDefaultsTested; assertEquals("default LockType", DirectoryFactory.LOCK_TYPE_NATIVE, sic.lockType);
++numDefaultsTested; assertEquals("default infoStream", InfoStream.NO_OUTPUT, sic.infoStream);
++numDefaultsTested; assertNotNull("default metrics", sic.metricsInfo);
++numDefaultsTested; ++numNullDefaults;
assertNull("default mergePolicyFactoryInfo", sic.mergePolicyFactoryInfo);
++numDefaultsTested; ++numNullDefaults; assertNull("default mergeSchedulerInfo", sic.mergeSchedulerInfo);
++numDefaultsTested; ++numNullDefaults; assertNull("default mergedSegmentWarmerInfo", sic.mergedSegmentWarmerInfo);
IndexSchema indexSchema = IndexSchemaFactory.buildIndexSchema("schema.xml", solrConfig);
IndexWriterConfig iwc = sic.toIndexWriterConfig(h.getCore());
assertNotNull("null mp", iwc.getMergePolicy());
assertTrue("mp is not TieredMergePolicy", iwc.getMergePolicy() instanceof TieredMergePolicy);
assertNotNull("null ms", iwc.getMergeScheduler());
assertTrue("ms is not CMS", iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler);
assertNull("non-null mergedSegmentWarmer", iwc.getMergedSegmentWarmer());
final int numDefaultsMapped = sic.toMap(new LinkedHashMap<>()).size();
assertEquals("numDefaultsTested vs. numDefaultsMapped+numNullDefaults ="+sic.toMap(new LinkedHashMap<>()).keySet(), numDefaultsTested, numDefaultsMapped+numNullDefaults);
}