下面列出了org.apache.lucene.index.IndexWriter#addIndexes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public int doLogic() throws Exception {
IndexWriter writer = getRunData().getIndexWriter();
if (useAddIndexesDir) {
writer.addIndexes(inputDir);
} else {
try (IndexReader r = DirectoryReader.open(inputDir)) {
CodecReader leaves[] = new CodecReader[r.leaves().size()];
int i = 0;
for (LeafReaderContext leaf : r.leaves()) {
leaves[i++] = SlowCodecReaderWrapper.wrap(leaf.reader());
}
writer.addIndexes(leaves);
}
}
return 1;
}
/**
* Merges the given taxonomy and index directories and commits the changes to
* the given writers.
*/
public static void merge(Directory srcIndexDir, Directory srcTaxoDir, OrdinalMap map, IndexWriter destIndexWriter,
DirectoryTaxonomyWriter destTaxoWriter, FacetsConfig srcConfig) throws IOException {
// merge the taxonomies
destTaxoWriter.addTaxonomy(srcTaxoDir, map);
int ordinalMap[] = map.getMap();
DirectoryReader reader = DirectoryReader.open(srcIndexDir);
try {
List<LeafReaderContext> leaves = reader.leaves();
int numReaders = leaves.size();
CodecReader wrappedLeaves[] = new CodecReader[numReaders];
for (int i = 0; i < numReaders; i++) {
wrappedLeaves[i] = SlowCodecReaderWrapper.wrap(new OrdinalMappingLeafReader(leaves.get(i).reader(), ordinalMap, srcConfig));
}
destIndexWriter.addIndexes(wrappedLeaves);
// commit changes to taxonomy and index respectively.
destTaxoWriter.commit();
destIndexWriter.commit();
} finally {
reader.close();
}
}
@Test
public void testSymlinkWithIndexes() throws IOException {
HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_base, "dir1"));
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
IndexWriter writer1 = new IndexWriter(dir1, conf.clone());
writer1.addDocument(getDoc());
writer1.close();
HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_base, "dir2"));
IndexWriter writer2 = new IndexWriter(dir2, conf.clone());
writer2.addIndexes(dir1);
writer2.close();
DirectoryReader reader1 = DirectoryReader.open(dir1);
DirectoryReader reader2 = DirectoryReader.open(dir2);
assertEquals(1, reader1.maxDoc());
assertEquals(1, reader2.maxDoc());
assertEquals(1, reader1.numDocs());
assertEquals(1, reader2.numDocs());
Document document1 = reader1.document(0);
Document document2 = reader2.document(0);
assertEquals(document1.get("id"), document2.get("id"));
}
public static void main(String[] args) throws Exception {
Options options = null;
try {
options = Options.parse(args);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(2);
}
// Try to use hardlinks to source segments, if possible.
Directory mergedIndex = new HardlinkCopyDirectoryWrapper(FSDirectory.open(Paths.get(options.mergedIndexPath)));
Directory[] indexes = new Directory[options.indexPaths.length];
for (int i = 0; i < indexes.length; i++) {
indexes[i] = FSDirectory.open(Paths.get(options.indexPaths[i]));
}
IndexWriter writer = new IndexWriter(mergedIndex, options.config);
System.out.println("Merging...");
writer.addIndexes(indexes);
if (options.maxSegments > 0) {
System.out.println("Force-merging to " + options.maxSegments + "...");
writer.forceMerge(options.maxSegments);
}
writer.close();
System.out.println("Done.");
}
public static void addIndexesSlowly(IndexWriter writer, DirectoryReader... readers) throws IOException {
List<CodecReader> leaves = new ArrayList<>();
for (DirectoryReader reader : readers) {
for (LeafReaderContext context : reader.leaves()) {
leaves.add(SlowCodecReaderWrapper.wrap(context.reader()));
}
}
writer.addIndexes(leaves.toArray(new CodecReader[leaves.size()]));
}
private void copyAndOptimizeInFlightDir() throws IOException {
CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
DirectoryReader reader = DirectoryReader.open(_localDir);
IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
writer.addIndexes(reader);
writer.setCommitData(getInternalMarker());
writer.close();
rm(_localPath);
}
@Override
public void close(TaskAttemptContext context) throws IOException {
LOG.debug("Task " + context.getTaskAttemptID() + " merging into dstDir: " + workDir + ", srcDirs: " + shards);
writeShardNumberFile(context);
heartBeater.needHeartBeat();
try {
Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
// TODO: shouldn't we pull the Version from the solrconfig.xml?
IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LUCENE_CURRENT, null)
.setOpenMode(OpenMode.CREATE).setUseCompoundFile(false)
//.setMergePolicy(mergePolicy) // TODO: grab tuned MergePolicy from solrconfig.xml?
//.setMergeScheduler(...) // TODO: grab tuned MergeScheduler from solrconfig.xml?
;
if (LOG.isDebugEnabled()) {
writerConfig.setInfoStream(System.out);
}
// writerConfig.setRAMBufferSizeMB(100); // improve performance
// writerConfig.setMaxThreadStates(1);
// disable compound file to improve performance
// also see http://lucene.472066.n3.nabble.com/Questions-on-compound-file-format-td489105.html
// also see defaults in SolrIndexConfig
MergePolicy mergePolicy = writerConfig.getMergePolicy();
LOG.debug("mergePolicy was: {}", mergePolicy);
if (mergePolicy instanceof TieredMergePolicy) {
((TieredMergePolicy) mergePolicy).setNoCFSRatio(0.0);
// ((TieredMergePolicy) mergePolicy).setMaxMergeAtOnceExplicit(10000);
// ((TieredMergePolicy) mergePolicy).setMaxMergeAtOnce(10000);
// ((TieredMergePolicy) mergePolicy).setSegmentsPerTier(10000);
} else if (mergePolicy instanceof LogMergePolicy) {
((LogMergePolicy) mergePolicy).setNoCFSRatio(0.0);
}
LOG.info("Using mergePolicy: {}", mergePolicy);
IndexWriter writer = new IndexWriter(mergedIndex, writerConfig);
Directory[] indexes = new Directory[shards.size()];
for (int i = 0; i < shards.size(); i++) {
indexes[i] = new HdfsDirectory(shards.get(i), context.getConfiguration());
}
context.setStatus("Logically merging " + shards.size() + " shards into one shard");
LOG.info("Logically merging " + shards.size() + " shards into one shard: " + workDir);
long start = System.nanoTime();
writer.addIndexes(indexes);
// TODO: avoid intermediate copying of files into dst directory; rename the files into the dir instead (cp -> rename)
// This can improve performance and turns this phase into a true "logical" merge, completing in constant time.
// See https://issues.apache.org/jira/browse/LUCENE-4746
if (LOG.isDebugEnabled()) {
context.getCounter(SolrCounters.class.getName(), SolrCounters.LOGICAL_TREE_MERGE_TIME.toString()).increment(System.currentTimeMillis() - start);
}
float secs = (System.nanoTime() - start) / (float)(10^9);
LOG.info("Logical merge took {} secs", secs);
int maxSegments = context.getConfiguration().getInt(TreeMergeMapper.MAX_SEGMENTS_ON_TREE_MERGE, Integer.MAX_VALUE);
context.setStatus("Optimizing Solr: forcing mtree merge down to " + maxSegments + " segments");
LOG.info("Optimizing Solr: forcing tree merge down to {} segments", maxSegments);
start = System.nanoTime();
if (maxSegments < Integer.MAX_VALUE) {
writer.forceMerge(maxSegments);
// TODO: consider perf enhancement for no-deletes merges: bulk-copy the postings data
// see http://lucene.472066.n3.nabble.com/Experience-with-large-merge-factors-tp1637832p1647046.html
}
if (LOG.isDebugEnabled()) {
context.getCounter(SolrCounters.class.getName(), SolrCounters.PHYSICAL_TREE_MERGE_TIME.toString()).increment(System.currentTimeMillis() - start);
}
secs = (System.nanoTime() - start) / (float)(10^9);
LOG.info("Optimizing Solr: done forcing tree merge down to {} segments in {} secs", maxSegments, secs);
start = System.nanoTime();
LOG.info("Optimizing Solr: Closing index writer");
writer.close();
secs = (System.nanoTime() - start) / (float)(10^9);
LOG.info("Optimizing Solr: Done closing index writer in {} secs", secs);
context.setStatus("Done");
} finally {
heartBeater.cancelHeartBeat();
heartBeater.close();
}
}