下面列出了java.util.NavigableSet# descendingIterator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
/**
* descending iterator of key set is inverse ordered
*/
public void testKeySetDescendingIteratorOrder() {
ConcurrentSkipListMap map = map5();
NavigableSet s = map.navigableKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, five);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) > 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of descendingKeySet is ordered
*/
public void testDescendingKeySetDescendingIteratorOrder() {
ConcurrentSkipListMap map = map5();
NavigableSet s = map.descendingKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, one);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) < 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of key set is inverse ordered
*/
public void testKeySetDescendingIteratorOrder() {
TreeMap map = map5();
NavigableSet s = map.navigableKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, five);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) > 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of descendingKeySet is ordered
*/
public void testDescendingKeySetDescendingIteratorOrder() {
TreeMap map = map5();
NavigableSet s = map.descendingKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, one);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) < 0);
last = k;
++count;
}
assertEquals(5, count);
}
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
if (cacheSpans != null) {
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
}
/**
* descending iterator of key set is inverse ordered
*/
public void testKeySetDescendingIteratorOrder() {
ConcurrentSkipListMap map = map5();
NavigableSet s = map.navigableKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, five);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) > 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of descendingKeySet is ordered
*/
public void testDescendingKeySetDescendingIteratorOrder() {
ConcurrentSkipListMap map = map5();
NavigableSet s = map.descendingKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, one);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) < 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of key set is inverse ordered
*/
public void testKeySetDescendingIteratorOrder() {
TreeMap map = map5();
NavigableSet s = map.navigableKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, five);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) > 0);
last = k;
++count;
}
assertEquals(5, count);
}
/**
* descending iterator of descendingKeySet is ordered
*/
public void testDescendingKeySetDescendingIteratorOrder() {
TreeMap map = map5();
NavigableSet s = map.descendingKeySet();
Iterator i = s.descendingIterator();
Integer last = (Integer)i.next();
assertEquals(last, one);
int count = 1;
while (i.hasNext()) {
Integer k = (Integer)i.next();
assertTrue(last.compareTo(k) < 0);
last = k;
++count;
}
assertEquals(5, count);
}
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
public CachedRegionTracker(Cache cache, String cacheKey, ChunkIndex chunkIndex) {
this.cache = cache;
this.cacheKey = cacheKey;
this.chunkIndex = chunkIndex;
this.regions = new TreeSet<>();
this.lookupRegion = new Region(0, 0);
synchronized (this) {
NavigableSet<CacheSpan> cacheSpans = cache.addListener(cacheKey, this);
// Merge the spans into regions. mergeSpan is more efficient when merging from high to low,
// which is why a descending iterator is used here.
Iterator<CacheSpan> spanIterator = cacheSpans.descendingIterator();
while (spanIterator.hasNext()) {
CacheSpan span = spanIterator.next();
mergeSpan(span);
}
}
}
/**
* The function searches in a given sorted replica list until it finds a replica that is eligible to swap with the
* specified replica.
*
* @param replica the specific replica to swap.
* @param sortedReplicasToSearch the sorted replica list.
* @param targetSize the target size for the eligible replica.
* @param minSize the min size for the eligible replica.
* @param maxSize the max size for the eligible replica.
* @param clusterModel the cluster model.
*
* @return The replica that can be swapped with the given replica, null otherwise.
*/
Replica findReplicaToSwapWith(Replica replica,
NavigableSet<ReplicaWrapper> sortedReplicasToSearch,
double targetSize,
double minSize,
double maxSize,
ClusterModel clusterModel) {
if (minSize > maxSize) {
return null;
}
// Within the candidate replicas, find the replicas whose size falls into [minSize, maxSize], not inclusive.
NavigableSet<ReplicaWrapper> candidates = sortedReplicasToSearch.subSet(ReplicaWrapper.greaterThan(minSize),
false,
ReplicaWrapper.lessThan(maxSize),
false);
// No candidate available, just return null.
if (candidates.isEmpty()) {
return null;
}
// The iterator for replicas from [targetSize (inclusive), maxSize (exclusive)]
Iterator<ReplicaWrapper> ascendingLargerIter = null;
// The iterator for replicas from [minSize (exclusive), targetSize (inclusive)]
Iterator<ReplicaWrapper> descendingLessIter = null;
// Check if the target size falls in the range or not. This is needed to avoid passing invalid targetSize to
// the tailSet() or headSet().
if (targetSize <= minSize) {
ascendingLargerIter = candidates.iterator();
} else if (targetSize >= maxSize) {
descendingLessIter = candidates.descendingIterator();
} else {
ascendingLargerIter = candidates.tailSet(ReplicaWrapper.greaterThanOrEqualsTo(targetSize), true).iterator();
descendingLessIter = candidates.headSet(ReplicaWrapper.lessThanOrEqualsTo(targetSize), true).descendingIterator();
}
// Advance the ascending and descending iterator in the ascending order of their distance from target size,
// return the first replica that can swap. Otherwise return null.
ReplicaWrapper low = null;
ReplicaWrapper high = null;
ReplicaWrapper candidateReplica = null;
while (true) {
// The last checked replica is from the high end, advance the ascending iterator.
if (candidateReplica == high) {
high = ascendingLargerIter != null && ascendingLargerIter.hasNext() ? ascendingLargerIter.next() : null;
}
// The last checked replica is from the low end, advance the descending iterator.
if (candidateReplica == low) {
low = descendingLessIter != null && descendingLessIter.hasNext() ? descendingLessIter.next() : null;
}
// No more replicas to check, give up.
if (high == null && low == null) {
return null;
} else if (high == null) {
// Use the lower end
candidateReplica = low;
} else if (low == null) {
// Use the higher end
candidateReplica = high;
} else {
// pick a replica closer to the target.
double lowDiff = targetSize - low.size();
double highDiff = high.size() - targetSize;
candidateReplica = lowDiff <= highDiff ? low : high;
}
if (canSwap(replica, candidateReplica.replica(), clusterModel)) {
return candidateReplica.replica();
}
}
}
/**
* Based a resource preemption target drop reservations of containers and
* if necessary select containers for preemption from applications in each
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete.
*
* @param queues set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt
*/
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
if (qT.preemptionDisabled && qT.leafQueue != null) {
if (LOG.isDebugEnabled()) {
if (Resources.greaterThan(rc, clusterResource,
qT.toBePreempted, Resource.newInstance(0, 0, 0))) {
LOG.debug("Tried to preempt the following "
+ "resources from non-preemptable queue: "
+ qT.queueName + " - Resources: " + qT.toBePreempted);
}
}
continue;
}
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
if (Resources.greaterThan(rc, clusterResource, qT.current,
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
// we introduce a dampening factor naturalTerminationFactor that
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
Resource skippedAMSize = Resource.newInstance(0, 0, 0);
// lock the leafqueue while we scan applications and unreserve
synchronized (qT.leafQueue) {
NavigableSet<FiCaSchedulerApp> ns =
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
Resources.none())) {
break;
}
preemptMap.put(
fc.getApplicationAttemptId(),
preemptFrom(fc, clusterResource, resToObtain,
skippedAMContainerlist, skippedAMSize));
}
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
qT.leafQueue.getAbsoluteCapacity()),
qT.leafQueue.getMaxAMResourcePerQueuePercent());
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preempted from this Queue.
preemptAMContainers(clusterResource, preemptMap,
skippedAMContainerlist, resToObtain, skippedAMSize,
maxAMCapacityForThisQueue);
}
}
}
return preemptMap;
}
/**
* Based a resource preemption target drop reservations of containers and
* if necessary select containers for preemption from applications in each
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete.
*
* @param queues set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt
*/
private Map<ApplicationAttemptId,Map<RMContainer,Resource>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
Map<ApplicationAttemptId, Map<RMContainer,Resource>> preemptMap =
new HashMap<ApplicationAttemptId, Map<RMContainer,Resource>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
//for test only
if(isTest){
getContainersToPreemptForTest(preemptMap, queues, clusterResource);
}
for (TempQueue qT : queues) {
if (qT.preemptionDisabled && qT.leafQueue != null) {
if (LOG.isDebugEnabled()) {
if (Resources.greaterThan(rc, clusterResource,
qT.toBePreempted, Resource.newInstance(0, 0))) {
LOG.info("Tried to preempt the following "
+ "resources from non-preemptable queue: "
+ qT.queueName + " - Resources: " + qT.toBePreempted);
}
}
continue;
}
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
if (Resources.greaterThan(rc, clusterResource, qT.current,
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
// we introduce a dampening factor naturalTerminationFactor that
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
Resource skippedAMSize = Resource.newInstance(0, 0);
LOG.info("try to preempt: "+resToObtain+" from queue: "+qT.queueName);
if(resToObtain.getMemory() > 0){
LOG.info("resToObtain memory: "+resToObtain.getMemory());
}
// lock the leafqueue while we scan applications and unreserve
synchronized (qT.leafQueue) {
//what is the descending order
NavigableSet<FiCaSchedulerApp> ns =
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
Resources.none())) {
break;
}
LOG.info("now try to preempt applicatin:"+fc.getApplicationId());
preemptMap.put(
fc.getApplicationAttemptId(),
preemptFrom(fc, clusterResource, resToObtain,
skippedAMContainerlist, skippedAMSize));
}
//we allow preempt AM for kill based approach
if(false){
//we will never preempt am resource
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
qT.leafQueue.getAbsoluteCapacity()),
qT.leafQueue.getMaxAMResourcePerQueuePercent());
//Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preempted from this Queue.
preemptAMContainers(clusterResource, preemptMap,
skippedAMContainerlist, resToObtain, skippedAMSize,
maxAMCapacityForThisQueue);
}
}
}
}
return preemptMap;
}
@SuppressWarnings("unchecked")
private static void testNavigableSet(NavigableSet<Integer> s) {
clear(s);
checkNavigableSetKeys(s, 1, null, null, null, null);
check(s.add(1));
check(s.add(3));
check(s.add(5));
check(!s.add(1));
check(!s.add(3));
check(!s.add(5));
checkNavigableSetKeys(s, 0, null, null, 1, 1);
checkNavigableSetKeys(s, 1, null, 1, 1, 3);
checkNavigableSetKeys(s, 2, 1, 1, 3, 3);
checkNavigableSetKeys(s, 3, 1, 3, 3, 5);
checkNavigableSetKeys(s, 5, 3, 5, 5, null);
checkNavigableSetKeys(s, 6, 5, 5, null, null);
for (final Iterator<Integer> it :
(Iterator<Integer>[])
new Iterator<?>[] {
s.descendingIterator(),
s.descendingSet().iterator()}) {
equalNext(it, 5);
equalNext(it, 3);
equalNext(it, 1);
check(!it.hasNext());
THROWS(NoSuchElementException.class, () -> it.next());
}
prepSetForDescItrTests(s);
checkDescItrRmFirst(s, s.descendingIterator());
prepSetForDescItrTests(s);
checkDescItrRmMid(s, s.descendingIterator());
prepSetForDescItrTests(s);
checkDescItrRmLast(s, s.descendingIterator());
prepSetForDescItrTests(s);
checkDescItrRmFirst(s, s.descendingSet().iterator());
prepSetForDescItrTests(s);
checkDescItrRmMid(s, s.descendingSet().iterator());
prepSetForDescItrTests(s);
checkDescItrRmLast(s, s.descendingSet().iterator());
}
/**
* Finds the latest {@link IndexValue} associated with the {@code key} that matches any of the provided {@code types}
* if present in the index within the given {@code fileSpan}.
* @param key the {@link StoreKey} whose {@link IndexValue} is required.
* @param fileSpan {@link FileSpan} which specifies the range within which search should be made
* @param types the types of {@link IndexEntryType} to look for. The latest entry matching one of the types will be
* returned
* @param indexSegments the map of index segment start {@link Offset} to {@link IndexSegment} instances
* @return The latest {@link IndexValue} for {@code key} conforming to one of the types {@code types} - if one exists
* within the {@code fileSpan}, {@code null} otherwise.
* @throws StoreException
*/
private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet<IndexEntryType> types,
ConcurrentSkipListMap<Offset, IndexSegment> indexSegments) throws StoreException {
IndexValue latest = null;
IndexValue retCandidate = null;
final Timer.Context context = metrics.findTime.time();
try {
ConcurrentNavigableMap<Offset, IndexSegment> segmentsMapToSearch;
if (fileSpan == null) {
logger.trace("Searching for {} in the entire index", key);
segmentsMapToSearch = indexSegments.descendingMap();
} else {
logger.trace("Searching for {} in index with filespan ranging from {} to {}", key, fileSpan.getStartOffset(),
fileSpan.getEndOffset());
segmentsMapToSearch = indexSegments.subMap(indexSegments.floorKey(fileSpan.getStartOffset()), true,
indexSegments.floorKey(fileSpan.getEndOffset()), true).descendingMap();
metrics.segmentSizeForExists.update(segmentsMapToSearch.size());
}
int segmentsSearched = 0;
for (Map.Entry<Offset, IndexSegment> entry : segmentsMapToSearch.entrySet()) {
segmentsSearched++;
logger.trace("Index : {} searching index with start offset {}", dataDir, entry.getKey());
NavigableSet<IndexValue> values = entry.getValue().find(key);
if (values != null) {
Iterator<IndexValue> it = values.descendingIterator();
while (it.hasNext()) {
IndexValue value = it.next();
if (latest == null) {
latest = value;
}
logger.trace("Index : {} found value offset {} size {} ttl {}", dataDir, value.getOffset(), value.getSize(),
value.getExpiresAtMs());
if (types.contains(IndexEntryType.DELETE) && value.isDelete()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.UNDELETE) && value.isUndelete()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.TTL_UPDATE) && !value.isDelete() && !value.isUndelete()
&& value.isTtlUpdate()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.PUT) && value.isPut()) {
retCandidate = value;
break;
}
// note that it is not possible for a TTL update record to exist for a key but not have a PUT or DELETE
// record.
}
if (retCandidate != null) {
// merge entries if required to account for updated fields
if (latest.isTtlUpdate() && !retCandidate.isTtlUpdate()) {
retCandidate = new IndexValue(retCandidate.getOffset().getName(), retCandidate.getBytes(),
retCandidate.getFormatVersion());
retCandidate.setFlag(IndexValue.Flags.Ttl_Update_Index);
retCandidate.setExpiresAtMs(latest.getExpiresAtMs());
}
break;
}
}
}
metrics.segmentsAccessedPerBlobCount.update(segmentsSearched);
} finally {
context.stop();
}
if (retCandidate != null) {
logger.trace("Index : {} Returning value offset {} size {} ttl {}", dataDir, retCandidate.getOffset(),
retCandidate.getSize(), retCandidate.getExpiresAtMs());
}
return retCandidate;
}
/**
* Finds all the {@link IndexValue}s associated with the given {@code key} that matches any of the provided {@code types}
* if present in the index with the given {@code fileSpan} and return them in reversed chronological order. If there is
* no matched {@link IndexValue}, this method would return null;
* @param key the {@link StoreKey} whose {@link IndexValue} is required.
* @param fileSpan {@link FileSpan} which specifies the range within which search should be made.
* @param types the types of {@link IndexEntryType} to look for.
* @param indexSegments the map of index segment start {@link Offset} to {@link IndexSegment} instances
* @return The list of the {@link IndexValue}s for {@code key} conforming to one of the types {@code types} ordered
* from most to least recent.
* @throws StoreException any error.
*/
List<IndexValue> findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan, EnumSet<IndexEntryType> types,
ConcurrentSkipListMap<Offset, IndexSegment> indexSegments) throws StoreException {
List<IndexValue> result = new ArrayList<>();
final Timer.Context context = metrics.findTime.time();
try {
ConcurrentNavigableMap<Offset, IndexSegment> segmentsMapToSearch;
if (fileSpan == null) {
logger.trace("Searching all indexes for {} in the entire index", key);
segmentsMapToSearch = indexSegments.descendingMap();
} else {
logger.trace("Searching all indexes for {} in index with filespan ranging from {} to {}", key,
fileSpan.getStartOffset(), fileSpan.getEndOffset());
segmentsMapToSearch = indexSegments.subMap(indexSegments.floorKey(fileSpan.getStartOffset()), true,
indexSegments.floorKey(fileSpan.getEndOffset()), true).descendingMap();
metrics.segmentSizeForExists.update(segmentsMapToSearch.size());
}
int segmentsSearched = 0;
for (Map.Entry<Offset, IndexSegment> entry : segmentsMapToSearch.entrySet()) {
segmentsSearched++;
logger.trace("Index : {} searching all indexes with start offset {}", dataDir, entry.getKey());
NavigableSet<IndexValue> values = entry.getValue().find(key);
if (values != null) {
Iterator<IndexValue> it = values.descendingIterator();
while (it.hasNext()) {
IndexValue value = it.next();
if ((types.contains(IndexEntryType.DELETE) && value.isDelete()) || (types.contains(IndexEntryType.UNDELETE)
&& value.isUndelete()) || (types.contains(IndexEntryType.TTL_UPDATE) && !value.isDelete()
&& !value.isUndelete() && value.isTtlUpdate()) || (types.contains(IndexEntryType.PUT)
&& value.isPut())) {
// Add a copy of the value to the result since we return a modifiable list to the caller.
result.add(new IndexValue(value));
}
}
}
}
metrics.segmentsAccessedPerBlobCount.update(segmentsSearched);
} finally {
context.stop();
}
if (!result.isEmpty()) {
logger.trace("Index: {} Returning values {}", dataDir, result);
}
return result.isEmpty() ? null : result;
}