下面列出了怎么用org.apache.lucene.index.IndexCommit的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
return Lucene.readSegmentInfos(latestCommit);
} catch (IOException e) {
// Fall back to reading from the store if reading from the commit fails
try {
return store. readLastCommittedSegmentsInfo();
} catch (IOException e2) {
e2.addSuppressed(e);
throw e2;
}
} finally {
sm.release(searcher);
}
}
/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
// Commits are sorted by age (the 0th one is the oldest commit).
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitUserData = commits.get(i).getUserData();
// Ignore index commits with different translog uuid.
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
// If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
// In this case, we return the oldest index commit instead.
return 0;
}
/**
* Constructor with the given index directory and callback to notify when the
* indexes were updated.
*/
public IndexReplicationHandler(Directory indexDir, Callable<Boolean> callback) throws IOException {
this.callback = callback;
this.indexDir = indexDir;
currentRevisionFiles = null;
currentVersion = null;
if (DirectoryReader.indexExists(indexDir)) {
final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir);
final IndexCommit commit = commits.get(commits.size() - 1);
currentRevisionFiles = IndexRevision.revisionFiles(commit);
currentVersion = IndexRevision.revisionVersion(commit);
final InfoStream infoStream = InfoStream.getDefault();
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ " currentRevisionFiles=" + currentRevisionFiles);
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
}
}
}
@Override
public List<File> getFiles(long commitGen) throws LukeException {
IndexCommit ic = getCommitMap().get(commitGen);
if (ic == null) {
String msg = String.format(Locale.ENGLISH, "Commit generation %d not exists.", commitGen);
log.warn(msg);
return Collections.emptyList();
}
try {
return ic.getFileNames().stream()
.map(name -> File.of(indexPath, name))
.sorted(Comparator.comparing(File::getFileName))
.collect(Collectors.toList());
} catch (IOException e) {
throw new LukeException(String.format(Locale.ENGLISH, "Failed to load files for commit generation %d", commitGen), e);
}
}
public static SimpleOrderedMap<Object> getIndexInfo(DirectoryReader reader) throws IOException {
Directory dir = reader.directory();
SimpleOrderedMap<Object> indexInfo = new SimpleOrderedMap<>();
indexInfo.add("numDocs", reader.numDocs());
indexInfo.add("maxDoc", reader.maxDoc());
indexInfo.add("deletedDocs", reader.maxDoc() - reader.numDocs());
indexInfo.add("indexHeapUsageBytes", getIndexHeapUsed(reader));
indexInfo.add("version", reader.getVersion()); // TODO? Is this different then: IndexReader.getCurrentVersion( dir )?
indexInfo.add("segmentCount", reader.leaves().size());
indexInfo.add("current", closeSafe( reader::isCurrent));
indexInfo.add("hasDeletions", reader.hasDeletions() );
indexInfo.add("directory", dir );
IndexCommit indexCommit = reader.getIndexCommit();
String segmentsFileName = indexCommit.getSegmentsFileName();
indexInfo.add("segmentsFile", segmentsFileName);
indexInfo.add("segmentsFileSizeInBytes", getSegmentsFileLength(indexCommit));
Map<String,String> userData = indexCommit.getUserData();
indexInfo.add("userData", userData);
String s = userData.get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (s != null) {
indexInfo.add("lastModified", new Date(Long.parseLong(s)));
}
return indexInfo;
}
private List<NamedList<Object>> getCommits() {
Map<Long, IndexCommit> commits = core.getDeletionPolicy().getCommits();
List<NamedList<Object>> l = new ArrayList<>();
for (IndexCommit c : commits.values()) {
try {
NamedList<Object> nl = new NamedList<>();
nl.add("indexVersion", IndexDeletionPolicyWrapper.getCommitTimestamp(c));
nl.add(GENERATION, c.getGeneration());
List<String> commitList = new ArrayList<>(c.getFileNames().size());
commitList.addAll(c.getFileNames());
Collections.sort(commitList);
nl.add(CMD_GET_FILE_LIST, commitList);
l.add(nl);
} catch (IOException e) {
log.warn("Exception while reading files for commit {}", c, e);
}
}
return l;
}
/**
* Retrieves the list of tlog files associated to a commit point.
* NOTE: The commit <b>MUST</b> be reserved before calling this method
*/
List<Map<String, Object>> getTlogFileList(IndexCommit commit) throws IOException {
long maxVersion = this.getMaxVersion(commit);
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
String[] logList = ulog.getLogList(new File(ulog.getLogDir()));
List<Map<String, Object>> tlogFiles = new ArrayList<>();
for (String fileName : logList) {
// filter out tlogs that are older than the current index commit generation, so that the list of tlog files is
// in synch with the latest index commit point
long startVersion = Math.abs(Long.parseLong(fileName.substring(fileName.lastIndexOf('.') + 1)));
if (startVersion < maxVersion) {
Map<String, Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, fileName);
fileMeta.put(SIZE, new File(ulog.getLogDir(), fileName).length());
tlogFiles.add(fileMeta);
}
}
return tlogFiles;
}
/**
* Atomically Saves (via reference counting) & Returns the specified commit if available.
* <p>
* If the return value is non-null, then the caller <em>MUST</em> call {@link #releaseCommitPoint}
* when finished using it in order to decrement the reference count, or the commit will be preserved
* in the Directory forever.
* </p>
*
* @return the commit point with the specified generation, or null if not available
* @see #saveCommitPoint
* @see #releaseCommitPoint
*/
public synchronized IndexCommit getAndSaveCommitPoint(Long generation) {
if (null == generation) {
throw new NullPointerException("generation to get and save must not be null");
}
final IndexCommit commit = knownCommits.get(generation);
if ( (null != commit && false != commit.isDeleted())
|| (null == commit && null != latestCommit && generation < latestCommit.getGeneration()) ) {
throw new IllegalStateException
("Specified index generation is too old to be saved: " + generation);
}
final AtomicInteger refCount
= savedCommits.computeIfAbsent(generation, s -> { return new AtomicInteger(); });
final int currentCount = refCount.incrementAndGet();
log.debug("Saving generation={}, refCount={}", generation, currentCount);
return commit;
}
@Test
public void testCommitAge() throws InterruptedException {
assumeFalse("This test is not working on Windows (or maybe machines with only 2 CPUs)",
Constants.WINDOWS);
IndexDeletionPolicyWrapper delPolicy = h.getCore().getDeletionPolicy();
addDocs();
Map<Long, IndexCommit> commits = delPolicy.getCommits();
IndexCommit ic = delPolicy.getLatestCommit();
String agestr = ((SolrDeletionPolicy) (delPolicy.getWrappedDeletionPolicy())).getMaxCommitAge().replaceAll("[a-zA-Z]", "").replaceAll("-", "");
long age = Long.parseLong(agestr);
Thread.sleep(age);
assertU(adoc("id", String.valueOf(6),
"name", "name" + String.valueOf(6)));
assertU(optimize());
assertQ("return all docs",
req("id:[0 TO 6]"),
"*[count(//doc)=6]"
);
commits = delPolicy.getCommits();
assertTrue(!commits.containsKey(ic.getGeneration()));
}
@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
_writeLock.lock();
try {
int size = commits.size();
for (int i = 0; i < size - 1; i++) {
IndexCommit indexCommit = commits.get(i);
long generation = indexCommit.getGeneration();
if (!_generationsToNames.containsKey(generation)) {
indexCommit.delete();
}
}
} finally {
_writeLock.unlock();
}
}
public void createSnapshot(String name, DirectoryReader reader, String context) throws IOException {
_writeLock.lock();
try {
if (_namesToGenerations.containsKey(name)) {
throw new IOException("Snapshot [" + name + "] already exists.");
}
LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
IndexCommit indexCommit = reader.getIndexCommit();
long generation = indexCommit.getGeneration();
_namesToGenerations.put(name, generation);
Set<String> names = _generationsToNames.get(generation);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(generation, names);
}
names.add(name);
storeGenerations();
} finally {
_writeLock.unlock();
}
}
@Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}
private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
ensureOpen();
// Here we don't have to trim translog because snapshotting an index commit
// does not lock translog or prevents unreferenced files from trimming.
indexWriter.deleteUnusedFiles();
}
}
/**
* Called by Lucene. Same as {@link #onCommit(java.util.List)}.
*/
@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
if (!commits.isEmpty()) { // this might be empty if we create a new index.
// the behavior has changed in Lucene 4.4 that calls onInit even with an empty commits list.
onCommit(commits);
}
}
/**
* Called by Lucene.. Wraps the provided commits with {@link SnapshotIndexCommit}
* and delegates to the wrapped deletion policy.
*/
@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
assert !commits.isEmpty() : "Commits must not be empty";
synchronized (mutex) {
List<SnapshotIndexCommit> snapshotCommits = wrapCommits(commits);
primary.onCommit(snapshotCommits);
// clean snapshots that their respective counts are 0 (should not really happen)
for (Iterator<SnapshotHolder> it = snapshots.values().iterator(); it.hasNext(); ) {
SnapshotHolder holder = it.next();
if (holder.counter <= 0) {
it.remove();
}
}
// build the current commits list (all the ones that are not deleted by the primary)
List<SnapshotIndexCommit> newCommits = new ArrayList<>();
for (SnapshotIndexCommit commit : snapshotCommits) {
if (!commit.isDeleted()) {
newCommits.add(commit);
}
}
this.commits = newCommits;
// the last commit that is not deleted
this.lastCommit = newCommits.get(newCommits.size() - 1);
}
}
private List<SnapshotIndexCommit> wrapCommits(List<? extends IndexCommit> commits) throws IOException {
final int count = commits.size();
List<SnapshotIndexCommit> snapshotCommits = new ArrayList<>(count);
for (int i = 0; i < count; i++)
snapshotCommits.add(new SnapshotIndexCommit(this, commits.get(i)));
return snapshotCommits;
}
/**
* Deletes all commits except the most recent one.
*/
@Override
public void onCommit(List<? extends IndexCommit> commits) {
// Note that commits.size() should normally be 2 (if not
// called by onInit above):
int size = commits.size();
for (int i = 0; i < size - 1; i++) {
commits.get(i).delete();
}
}
/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the snapshotting commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
// The commit can be clean up only if no pending snapshot and it is neither the safe commit nor last commit.
return refCount == 0 && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false;
}
SnapshotIndexCommit(SnapshotDeletionPolicy deletionPolicy, IndexCommit cp) throws IOException {
super(cp);
this.deletionPolicy = deletionPolicy;
ArrayList<String> tmpFiles = new ArrayList<>();
for (String o : cp.getFileNames()) {
tmpFiles.add(o);
}
files = tmpFiles.toArray(new String[tmpFiles.size()]);
}
public static IndexCommit findIndexCommit(Directory dir, String userData) throws IOException {
Collection<IndexCommit> commits = DirectoryReader.listCommits(dir);
for (final IndexCommit ic : commits) {
Map<String,String> map = ic.getUserData();
String ud = null;
if (map != null) {
ud = map.get(USER_DATA);
}
if (ud != null && ud.equals(userData)) {
return ic;
}
}
throw new IOException("index does not contain commit with userData: " + userData);
}
@Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
}
@Override
public int doLogic() throws IOException {
PerfRunData runData = getRunData();
Config config = runData.getConfig();
final IndexCommit ic;
if (commitUserData != null) {
ic = OpenReaderTask.findIndexCommit(runData.getDirectory(), commitUserData);
} else {
ic = null;
}
final IndexWriter writer = CreateIndexTask.configureWriter(config, runData, OpenMode.APPEND, ic);
runData.setIndexWriter(writer);
return 1;
}
/**
* Constructor with the given index directory and callback to notify when the
* indexes were updated.
*/
public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable<Boolean> callback)
throws IOException {
this.callback = callback;
this.indexDir = indexDir;
this.taxoDir = taxoDir;
currentRevisionFiles = null;
currentVersion = null;
final boolean indexExists = DirectoryReader.indexExists(indexDir);
final boolean taxoExists = DirectoryReader.indexExists(taxoDir);
if (indexExists != taxoExists) {
throw new IllegalStateException("search and taxonomy indexes must either both exist or not: index=" + indexExists
+ " taxo=" + taxoExists);
}
if (indexExists) { // both indexes exist
final IndexCommit indexCommit = IndexReplicationHandler.getLastCommit(indexDir);
final IndexCommit taxoCommit = IndexReplicationHandler.getLastCommit(taxoDir);
currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit);
currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit);
final InfoStream infoStream = InfoStream.getDefault();
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ " currentRevisionFiles=" + currentRevisionFiles);
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): indexCommit=" + indexCommit
+ " taxoCommit=" + taxoCommit);
}
}
}
/** Returns a singleton map of the revision files from the given {@link IndexCommit}. */
public static Map<String, List<RevisionFile>> revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit)
throws IOException {
HashMap<String,List<RevisionFile>> files = new HashMap<>();
files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next());
files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next());
return files;
}
/**
* Cleans up the index directory from old index files. This method uses the
* last commit found by {@link #getLastCommit(Directory)}. If it matches the
* expected segmentsFile, then all files not referenced by this commit point
* are deleted.
* <p>
* <b>NOTE:</b> this method does a best effort attempt to clean the index
* directory. It suppresses any exceptions that occur, as this can be retried
* the next time.
*/
public static void cleanupOldIndexFiles(Directory dir, String segmentsFile, InfoStream infoStream) {
try {
IndexCommit commit = getLastCommit(dir);
// commit == null means weird IO errors occurred, ignore them
// if there were any IO errors reading the expected commit point (i.e.
// segments files mismatch), then ignore that commit either.
if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
Set<String> commitFiles = new HashSet<>(commit.getFileNames());
Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
for (String file : dir.listAll()) {
if (!commitFiles.contains(file)
&& (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
// suppress exceptions, it's just a best effort
IOUtils.deleteFilesIgnoringExceptions(dir, file);
}
}
}
} catch (Throwable t) {
// ignore any errors that happen during this state and only log it. this
// cleanup will have a chance to succeed the next time we get a new
// revision.
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "cleanupOldIndexFiles(): failed on error " + t.getMessage());
}
}
}
private Map<Long, IndexCommit> initCommitMap() {
try {
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
Map<Long, IndexCommit> map = new TreeMap<>();
for (IndexCommit ic : indexCommits) {
map.put(ic.getGeneration(), ic);
}
return map;
} catch (IOException e) {
throw new LukeException("Failed to get commits list.", e);
}
}
@Override
public Optional<Commit> getCommit(long commitGen) throws LukeException {
IndexCommit ic = getCommitMap().get(commitGen);
if (ic == null) {
String msg = String.format(Locale.ENGLISH, "Commit generation %d not exists.", commitGen);
log.warn(msg);
return Optional.empty();
}
return Optional.of(Commit.of(ic));
}
/**
* Reads the segments infos from the given commit, failing if it fails to load
*/
public static SegmentInfos readSegmentInfos(IndexCommit commit) throws IOException {
// Using commit.getSegmentsFileName() does NOT work here, have to
// manually create the segment filename
String filename = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", commit.getGeneration());
return SegmentInfos.readCommit(commit.getDirectory(), filename);
}
private SegmentInfos findSegmentInfos(long commitGen) throws LukeException, IOException {
IndexCommit ic = getCommitMap().get(commitGen);
if (ic == null) {
return null;
}
String segmentFile = ic.getSegmentsFileName();
return SegmentInfos.readCommit(dir, segmentFile);
}
static Commit of(IndexCommit ic) {
Commit commit = new Commit();
commit.generation = ic.getGeneration();
commit.isDeleted = ic.isDeleted();
commit.segCount = ic.getSegmentCount();
try {
commit.userData = IndexUtils.getCommitUserData(ic);
} catch (IOException e) {
}
return commit;
}