下面列出了java.util.concurrent.ConcurrentNavigableMap#entrySet ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public boolean flush() {
ConcurrentNavigableMap<Long, LogSegment> beingFlushView = findBeingFlushView();
int lastOffset = -1;
long lastBaseOffset = -1;
for (Map.Entry<Long, LogSegment> entry : beingFlushView.entrySet()) {
try {
LogSegment segment = entry.getValue();
lastOffset = segment.flush();
lastBaseOffset = segment.getBaseOffset();
} catch (Exception e) {
break;
}
}
if (lastBaseOffset == -1 || lastOffset == -1) return false;
final long where = lastBaseOffset + lastOffset;
boolean result = where != this.flushedOffset;
this.flushedOffset = where;
return result;
}
/**
* entrySet contains all pairs
*/
public void testEntrySet() {
ConcurrentNavigableMap map = map5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(one) && e.getValue().equals("A")) ||
(e.getKey().equals(two) && e.getValue().equals("B")) ||
(e.getKey().equals(three) && e.getValue().equals("C")) ||
(e.getKey().equals(four) && e.getValue().equals("D")) ||
(e.getKey().equals(five) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testDescendingEntrySet() {
ConcurrentNavigableMap map = dmap5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(m1) && e.getValue().equals("A")) ||
(e.getKey().equals(m2) && e.getValue().equals("B")) ||
(e.getKey().equals(m3) && e.getValue().equals("C")) ||
(e.getKey().equals(m4) && e.getValue().equals("D")) ||
(e.getKey().equals(m5) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testEntrySet() {
ConcurrentNavigableMap map = map5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(one) && e.getValue().equals("A")) ||
(e.getKey().equals(two) && e.getValue().equals("B")) ||
(e.getKey().equals(three) && e.getValue().equals("C")) ||
(e.getKey().equals(four) && e.getValue().equals("D")) ||
(e.getKey().equals(five) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testEntrySet() {
ConcurrentNavigableMap map = map5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(one) && e.getValue().equals("A")) ||
(e.getKey().equals(two) && e.getValue().equals("B")) ||
(e.getKey().equals(three) && e.getValue().equals("C")) ||
(e.getKey().equals(four) && e.getValue().equals("D")) ||
(e.getKey().equals(five) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testDescendingEntrySet() {
ConcurrentNavigableMap map = dmap5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(m1) && e.getValue().equals("A")) ||
(e.getKey().equals(m2) && e.getValue().equals("B")) ||
(e.getKey().equals(m3) && e.getValue().equals("C")) ||
(e.getKey().equals(m4) && e.getValue().equals("D")) ||
(e.getKey().equals(m5) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testEntrySet() {
ConcurrentNavigableMap map = map5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(one) && e.getValue().equals("A")) ||
(e.getKey().equals(two) && e.getValue().equals("B")) ||
(e.getKey().equals(three) && e.getValue().equals("C")) ||
(e.getKey().equals(four) && e.getValue().equals("D")) ||
(e.getKey().equals(five) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testDescendingEntrySet() {
ConcurrentNavigableMap map = dmap5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(m1) && e.getValue().equals("A")) ||
(e.getKey().equals(m2) && e.getValue().equals("B")) ||
(e.getKey().equals(m3) && e.getValue().equals("C")) ||
(e.getKey().equals(m4) && e.getValue().equals("D")) ||
(e.getKey().equals(m5) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testEntrySet() {
ConcurrentNavigableMap map = map5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(one) && e.getValue().equals("A")) ||
(e.getKey().equals(two) && e.getValue().equals("B")) ||
(e.getKey().equals(three) && e.getValue().equals("C")) ||
(e.getKey().equals(four) && e.getValue().equals("D")) ||
(e.getKey().equals(five) && e.getValue().equals("E")));
}
}
/**
* entrySet contains all pairs
*/
public void testDescendingEntrySet() {
ConcurrentNavigableMap map = dmap5();
Set s = map.entrySet();
assertEquals(5, s.size());
Iterator it = s.iterator();
while (it.hasNext()) {
Map.Entry e = (Map.Entry) it.next();
assertTrue(
(e.getKey().equals(m1) && e.getValue().equals("A")) ||
(e.getKey().equals(m2) && e.getValue().equals("B")) ||
(e.getKey().equals(m3) && e.getValue().equals("C")) ||
(e.getKey().equals(m4) && e.getValue().equals("D")) ||
(e.getKey().equals(m5) && e.getValue().equals("E")));
}
}
@Override
public void scan(final byte[] startKey, final byte[] endKey, final int limit,
@SuppressWarnings("unused") final boolean readOnlySafe, final boolean returnValue,
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("SCAN");
final List<KVEntry> entries = Lists.newArrayList();
final int maxCount = normalizeLimit(limit);
final ConcurrentNavigableMap<byte[], byte[]> subMap;
final byte[] realStartKey = BytesUtil.nullToEmpty(startKey);
if (endKey == null) {
subMap = this.defaultDB.tailMap(realStartKey);
} else {
subMap = this.defaultDB.subMap(realStartKey, endKey);
}
try {
for (final Map.Entry<byte[], byte[]> entry : subMap.entrySet()) {
entries.add(new KVEntry(entry.getKey(), returnValue ? entry.getValue() : null));
if (entries.size() >= maxCount) {
break;
}
}
setSuccess(closure, entries);
} catch (final Exception e) {
LOG.error("Fail to [SCAN], range: ['[{}, {})'], {}.", BytesUtil.toHex(startKey), BytesUtil.toHex(endKey),
StackTraceUtil.stackTrace(e));
setFailure(closure, "Fail to [SCAN]");
} finally {
timeCtx.stop();
}
}
@Override
public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit,
@SuppressWarnings("unused") final boolean readOnlySafe, final boolean returnValue,
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("REVERSE_SCAN");
final List<KVEntry> entries = Lists.newArrayList();
final int maxCount = normalizeLimit(limit);
final ConcurrentNavigableMap<byte[], byte[]> subMap;
final byte[] realEndKey = BytesUtil.nullToEmpty(endKey);
if (startKey == null) {
subMap = this.defaultDB.descendingMap().headMap(realEndKey);
} else {
subMap = this.defaultDB.descendingMap().subMap(startKey, realEndKey);
}
try {
for (final Map.Entry<byte[], byte[]> entry : subMap.entrySet()) {
entries.add(new KVEntry(entry.getKey(), returnValue ? entry.getValue() : null));
if (entries.size() >= maxCount) {
break;
}
}
setSuccess(closure, entries);
} catch (final Exception e) {
LOG.error("Fail to [REVERSE_SCAN], range: ['[{}, {})'], {}.", BytesUtil.toHex(startKey),
BytesUtil.toHex(endKey), StackTraceUtil.stackTrace(e));
setFailure(closure, "Fail to [REVERSE_SCAN]");
} finally {
timeCtx.stop();
}
}
/**
* Gets all the entries from the journal starting at the provided offset and till the maxEntriesToReturn or the
* end of the journal is reached.
* @param offset The {@link Offset} from where the journal needs to return entries.
* @param inclusive if {@code true}, the returned entries (if not {@code null}), contain the entry at {@code offset}.
* @return The entries in the journal starting from offset. If the offset is outside the range of the journal,
* it returns null.
*/
List<JournalEntry> getEntriesSince(Offset offset, boolean inclusive) {
// To prevent synchronizing the addEntry method, we first get all the entries from the journal that are greater
// than offset. Once we have all the required entries, we finally check if the offset is actually present
// in the journal. If the offset is not present we return null, else we return the entries we got in the first step.
// The offset may not be present in the journal as it could be removed.
Map.Entry<Offset, StoreKey> first = journal.firstEntry();
Map.Entry<Offset, StoreKey> last = journal.lastEntry();
// check if the journal contains the offset.
if (first == null || offset.compareTo(first.getKey()) < 0 || last == null || offset.compareTo(last.getKey()) > 0
|| !journal.containsKey(offset)) {
return null;
}
ConcurrentNavigableMap<Offset, StoreKey> subsetMap = journal.tailMap(offset, true);
int entriesToReturn = Math.min(subsetMap.size(), maxEntriesToReturn);
List<JournalEntry> journalEntries = new ArrayList<JournalEntry>(entriesToReturn);
int entriesAdded = 0;
for (Map.Entry<Offset, StoreKey> entry : subsetMap.entrySet()) {
if (inclusive || !entry.getKey().equals(offset)) {
journalEntries.add(new JournalEntry(entry.getKey(), entry.getValue()));
entriesAdded++;
if (entriesAdded == entriesToReturn) {
break;
}
}
}
// Ensure that the offset was not pushed out of the journal.
first = journal.firstEntry();
if (first == null || offset.compareTo(first.getKey()) < 0) {
return null;
}
logger.trace("Journal : {} entries returned {}", dataDir, journalEntries.size());
return journalEntries;
}
/**
* Persist last flushed sequence id of each region to HDFS
* @throws IOException if persit to HDFS fails
*/
private void persistRegionLastFlushedSequenceIds() throws IOException {
if (isFlushSeqIdPersistInProgress) {
return;
}
isFlushSeqIdPersistInProgress = true;
try {
Configuration conf = master.getConfiguration();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(lastFlushedSeqIdPath)) {
LOG.info("Rewriting .lastflushedseqids file at: "
+ lastFlushedSeqIdPath);
if (!fs.delete(lastFlushedSeqIdPath, false)) {
throw new IOException("Unable to remove existing "
+ lastFlushedSeqIdPath);
}
} else {
LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath);
}
FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
FlushedSequenceId.Builder flushedSequenceIdBuilder =
FlushedSequenceId.newBuilder();
try {
for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) {
FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
FlushedRegionSequenceId.newBuilder();
flushedRegionSequenceIdBuilder.setRegionEncodedName(
ByteString.copyFrom(entry.getKey()));
flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
ConcurrentNavigableMap<byte[], Long> storeSeqIds =
storeFlushedSequenceIdsByRegion.get(entry.getKey());
if (storeSeqIds != null) {
for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
FlushedStoreSequenceId.newBuilder();
flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
}
}
flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
}
flushedSequenceIdBuilder.build().writeDelimitedTo(out);
} finally {
if (out != null) {
out.close();
}
}
} finally {
isFlushSeqIdPersistInProgress = false;
}
}
/**
* Finds the latest {@link IndexValue} associated with the {@code key} that matches any of the provided {@code types}
* if present in the index within the given {@code fileSpan}.
* @param key the {@link StoreKey} whose {@link IndexValue} is required.
* @param fileSpan {@link FileSpan} which specifies the range within which search should be made
* @param types the types of {@link IndexEntryType} to look for. The latest entry matching one of the types will be
* returned
* @param indexSegments the map of index segment start {@link Offset} to {@link IndexSegment} instances
* @return The latest {@link IndexValue} for {@code key} conforming to one of the types {@code types} - if one exists
* within the {@code fileSpan}, {@code null} otherwise.
* @throws StoreException
*/
private IndexValue findKey(StoreKey key, FileSpan fileSpan, EnumSet<IndexEntryType> types,
ConcurrentSkipListMap<Offset, IndexSegment> indexSegments) throws StoreException {
IndexValue latest = null;
IndexValue retCandidate = null;
final Timer.Context context = metrics.findTime.time();
try {
ConcurrentNavigableMap<Offset, IndexSegment> segmentsMapToSearch;
if (fileSpan == null) {
logger.trace("Searching for {} in the entire index", key);
segmentsMapToSearch = indexSegments.descendingMap();
} else {
logger.trace("Searching for {} in index with filespan ranging from {} to {}", key, fileSpan.getStartOffset(),
fileSpan.getEndOffset());
segmentsMapToSearch = indexSegments.subMap(indexSegments.floorKey(fileSpan.getStartOffset()), true,
indexSegments.floorKey(fileSpan.getEndOffset()), true).descendingMap();
metrics.segmentSizeForExists.update(segmentsMapToSearch.size());
}
int segmentsSearched = 0;
for (Map.Entry<Offset, IndexSegment> entry : segmentsMapToSearch.entrySet()) {
segmentsSearched++;
logger.trace("Index : {} searching index with start offset {}", dataDir, entry.getKey());
NavigableSet<IndexValue> values = entry.getValue().find(key);
if (values != null) {
Iterator<IndexValue> it = values.descendingIterator();
while (it.hasNext()) {
IndexValue value = it.next();
if (latest == null) {
latest = value;
}
logger.trace("Index : {} found value offset {} size {} ttl {}", dataDir, value.getOffset(), value.getSize(),
value.getExpiresAtMs());
if (types.contains(IndexEntryType.DELETE) && value.isDelete()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.UNDELETE) && value.isUndelete()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.TTL_UPDATE) && !value.isDelete() && !value.isUndelete()
&& value.isTtlUpdate()) {
retCandidate = value;
break;
} else if (types.contains(IndexEntryType.PUT) && value.isPut()) {
retCandidate = value;
break;
}
// note that it is not possible for a TTL update record to exist for a key but not have a PUT or DELETE
// record.
}
if (retCandidate != null) {
// merge entries if required to account for updated fields
if (latest.isTtlUpdate() && !retCandidate.isTtlUpdate()) {
retCandidate = new IndexValue(retCandidate.getOffset().getName(), retCandidate.getBytes(),
retCandidate.getFormatVersion());
retCandidate.setFlag(IndexValue.Flags.Ttl_Update_Index);
retCandidate.setExpiresAtMs(latest.getExpiresAtMs());
}
break;
}
}
}
metrics.segmentsAccessedPerBlobCount.update(segmentsSearched);
} finally {
context.stop();
}
if (retCandidate != null) {
logger.trace("Index : {} Returning value offset {} size {} ttl {}", dataDir, retCandidate.getOffset(),
retCandidate.getSize(), retCandidate.getExpiresAtMs());
}
return retCandidate;
}
/**
* Finds all the {@link IndexValue}s associated with the given {@code key} that matches any of the provided {@code types}
* if present in the index with the given {@code fileSpan} and return them in reversed chronological order. If there is
* no matched {@link IndexValue}, this method would return null;
* @param key the {@link StoreKey} whose {@link IndexValue} is required.
* @param fileSpan {@link FileSpan} which specifies the range within which search should be made.
* @param types the types of {@link IndexEntryType} to look for.
* @param indexSegments the map of index segment start {@link Offset} to {@link IndexSegment} instances
* @return The list of the {@link IndexValue}s for {@code key} conforming to one of the types {@code types} ordered
* from most to least recent.
* @throws StoreException any error.
*/
List<IndexValue> findAllIndexValuesForKey(StoreKey key, FileSpan fileSpan, EnumSet<IndexEntryType> types,
ConcurrentSkipListMap<Offset, IndexSegment> indexSegments) throws StoreException {
List<IndexValue> result = new ArrayList<>();
final Timer.Context context = metrics.findTime.time();
try {
ConcurrentNavigableMap<Offset, IndexSegment> segmentsMapToSearch;
if (fileSpan == null) {
logger.trace("Searching all indexes for {} in the entire index", key);
segmentsMapToSearch = indexSegments.descendingMap();
} else {
logger.trace("Searching all indexes for {} in index with filespan ranging from {} to {}", key,
fileSpan.getStartOffset(), fileSpan.getEndOffset());
segmentsMapToSearch = indexSegments.subMap(indexSegments.floorKey(fileSpan.getStartOffset()), true,
indexSegments.floorKey(fileSpan.getEndOffset()), true).descendingMap();
metrics.segmentSizeForExists.update(segmentsMapToSearch.size());
}
int segmentsSearched = 0;
for (Map.Entry<Offset, IndexSegment> entry : segmentsMapToSearch.entrySet()) {
segmentsSearched++;
logger.trace("Index : {} searching all indexes with start offset {}", dataDir, entry.getKey());
NavigableSet<IndexValue> values = entry.getValue().find(key);
if (values != null) {
Iterator<IndexValue> it = values.descendingIterator();
while (it.hasNext()) {
IndexValue value = it.next();
if ((types.contains(IndexEntryType.DELETE) && value.isDelete()) || (types.contains(IndexEntryType.UNDELETE)
&& value.isUndelete()) || (types.contains(IndexEntryType.TTL_UPDATE) && !value.isDelete()
&& !value.isUndelete() && value.isTtlUpdate()) || (types.contains(IndexEntryType.PUT)
&& value.isPut())) {
// Add a copy of the value to the result since we return a modifiable list to the caller.
result.add(new IndexValue(value));
}
}
}
}
metrics.segmentsAccessedPerBlobCount.update(segmentsSearched);
} finally {
context.stop();
}
if (!result.isEmpty()) {
logger.trace("Index: {} Returning values {}", dataDir, result);
}
return result.isEmpty() ? null : result;
}