下面列出了java.util.NavigableMap#firstEntry ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Finds the best candidate to compact by finding the candidate with the best cost benefit ratio
* @param validDataPerLogSegments the valid data size for each log segment in the form of a {@link NavigableMap} of segment names to
* valid data sizes.
* @param segmentCapacity Segment capacity of one {@link LogSegment}
* @param segmentHeaderSize Segment header size of a {@link LogSegment}
* @param maxBlobSize max blob size to factor in when calculating the cost benefit ratio
* @return the {@link CostBenefitInfo} for the best candidate to compact. {@code null} if there isn't any.
*/
private CostBenefitInfo getBestCandidateToCompact(NavigableMap<String, Long> validDataPerLogSegments,
long segmentCapacity, long segmentHeaderSize, long maxBlobSize) {
Map.Entry<String, Long> firstEntry = validDataPerLogSegments.firstEntry();
Map.Entry<String, Long> lastEntry = validDataPerLogSegments.lastEntry();
CostBenefitInfo bestCandidateToCompact = null;
while (firstEntry != null) {
Map.Entry<String, Long> endEntry = lastEntry;
while (endEntry != null && LogSegmentNameHelper.COMPARATOR.compare(firstEntry.getKey(), endEntry.getKey()) <= 0) {
CostBenefitInfo costBenefitInfo =
getCostBenefitInfo(firstEntry.getKey(), endEntry.getKey(), validDataPerLogSegments, segmentCapacity,
segmentHeaderSize, maxBlobSize);
if (costBenefitInfo.getBenefit() >= storeConfig.storeMinLogSegmentCountToReclaimToTriggerCompaction && (
bestCandidateToCompact == null
|| costBenefitInfo.getCostBenefitRatio().compareTo(bestCandidateToCompact.getCostBenefitRatio()) < 0)) {
bestCandidateToCompact = costBenefitInfo;
logger.trace("Updating best candidate to compact to {} ", bestCandidateToCompact);
}
endEntry = validDataPerLogSegments.lowerEntry(endEntry.getKey());
}
firstEntry = validDataPerLogSegments.higherEntry(firstEntry.getKey());
}
return bestCandidateToCompact;
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
/**
* pollFirstEntry returns entries in order
*/
public void testPollFirstEntry() {
NavigableMap map = map5();
Map.Entry e = map.pollFirstEntry();
assertEquals(one, e.getKey());
assertEquals("A", e.getValue());
e = map.pollFirstEntry();
assertEquals(two, e.getKey());
map.put(one, "A");
e = map.pollFirstEntry();
assertEquals(one, e.getKey());
assertEquals("A", e.getValue());
e = map.pollFirstEntry();
assertEquals(three, e.getKey());
map.remove(four);
e = map.pollFirstEntry();
assertEquals(five, e.getKey());
try {
e.setValue("A");
shouldThrow();
} catch (UnsupportedOperationException success) {}
assertTrue(map.isEmpty());
Map.Entry f = map.firstEntry();
assertNull(f);
e = map.pollFirstEntry();
assertNull(e);
}
private void manualCommit(TopicPartition topicPartition, NavigableMap<Long, Long> ackRanges) {
//the first entry in the acknowledgement registry keeps track of the lowest possible
//offset
//that can be committed
Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
if (firstEntry != null) {
consumer.commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(firstEntry.getValue() + 1)), null);
}
}
/**
* pollFirstEntry returns entries in order
*/
public void testPollFirstEntry() {
NavigableMap map = map5();
Map.Entry e = map.pollFirstEntry();
assertEquals(one, e.getKey());
assertEquals("A", e.getValue());
e = map.pollFirstEntry();
assertEquals(two, e.getKey());
map.put(one, "A");
e = map.pollFirstEntry();
assertEquals(one, e.getKey());
assertEquals("A", e.getValue());
e = map.pollFirstEntry();
assertEquals(three, e.getKey());
map.remove(four);
e = map.pollFirstEntry();
assertEquals(five, e.getKey());
try {
e.setValue("A");
shouldThrow();
} catch (UnsupportedOperationException success) {}
assertTrue(map.isEmpty());
Map.Entry f = map.firstEntry();
assertNull(f);
e = map.pollFirstEntry();
assertNull(e);
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
@Test
public void testMultiplePutsSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells for the same row
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there was only one put in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
// verify two cells were added to this one put operation
final NavigableMap<byte[], List<Cell>> familyCells = puts.get(0).getFamilyCellMap();
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(2, entry.getValue().size());
}
@Test
public void testMultiplePutsSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells for the same row
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there was only one put in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
// verify two cells were added to this one put operation
final NavigableMap<byte[], List<Cell>> familyCells = puts.get(0).getFamilyCellMap();
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(2, entry.getValue().size());
}
@Test
public void testMultiplePutsSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells for the same row
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there was only one put in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
// verify two cells were added to this one put operation
final NavigableMap<byte[], List<Cell>> familyCells = puts.get(0).getFamilyCellMap();
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(2, entry.getValue().size());
}
/**
* Verifies that the state in {@link PersistentIndex} is the same as the one in {@link #referenceIndex}.
* @param isLogSegmented {@code true} if segmented. {@code false} otherwise.
* @throws StoreException
*/
private void verifyState(boolean isLogSegmented) throws StoreException {
verifyRealIndexSanity();
assertEquals("Incorrect log segment count", isLogSegmented ? 3 : 1, index.getLogSegmentCount());
NavigableMap<Offset, IndexSegment> realIndex = index.getIndexSegments();
assertEquals("Number of index segments does not match expected", referenceIndex.size(), realIndex.size());
Map.Entry<Offset, IndexSegment> realIndexEntry = realIndex.firstEntry();
for (Map.Entry<Offset, TreeMap<MockId, TreeSet<IndexValue>>> referenceIndexEntry : referenceIndex.entrySet()) {
assertEquals("Offset of index segment does not match expected", referenceIndexEntry.getKey(),
realIndexEntry.getKey());
TreeMap<MockId, TreeSet<IndexValue>> referenceIndexSegment = referenceIndexEntry.getValue();
IndexSegment realIndexSegment = realIndexEntry.getValue();
for (Map.Entry<MockId, TreeSet<IndexValue>> referenceIndexSegmentEntry : referenceIndexSegment.entrySet()) {
MockId id = referenceIndexSegmentEntry.getKey();
NavigableSet<IndexValue> referenceValues = referenceIndexSegmentEntry.getValue();
NavigableSet<IndexValue> values = realIndexSegment.find(id);
assertNotNull("No values returned from real index segment for " + id, values);
assertTrue("No values returned from real index segment for " + id, values.size() > 0);
IndexValue value = values.first();
for (IndexValue referenceValue : referenceValues) {
assertEquals("Offset does not match", referenceValue.getOffset(), value.getOffset());
assertEquals("ExpiresAtMs does not match", referenceValue.getExpiresAtMs(), value.getExpiresAtMs());
assertEquals("Size does not match", referenceValue.getSize(), value.getSize());
assertEquals("Account ID does not match", referenceValue.getAccountId(), value.getAccountId());
assertEquals("Container ID does not match", referenceValue.getContainerId(), value.getContainerId());
assertEquals(
"Original message offset does not match " + id.toString() + " " + referenceValue + " " + value.toString(),
referenceValue.getOriginalMessageOffset(), value.getOriginalMessageOffset());
assertEquals("Flags do not match " + id.toString() + " " + referenceValue.toString() + " " + value.toString(),
referenceValue.getFlags(), value.getFlags());
if (index.hardDeleter.enabled.get() && !deletedKeys.contains(referenceIndexSegmentEntry.getKey())) {
assertEquals("Operation time does not match", referenceValue.getOperationTimeInMs(),
value.getOperationTimeInMs());
assertEquals("Value from IndexSegment does not match expected", referenceValue.getBytes(),
value.getBytes());
}
value = values.higher(value);
}
assertNull("There are more values in the real index", value);
}
realIndexEntry = realIndex.higherEntry(realIndexEntry.getKey());
}
assertNull("There should no more index segments left", realIndexEntry);
// all the elements in the last segment should be in the journal
assertNotNull("There is no offset in the log that corresponds to the last index segment start offset",
logOrder.get(referenceIndex.lastKey()));
Map.Entry<Offset, Pair<MockId, LogEntry>> logEntry = logOrder.floorEntry(referenceIndex.lastKey());
List<JournalEntry> entries = index.journal.getEntriesSince(referenceIndex.lastKey(), true);
for (JournalEntry entry : entries) {
assertNotNull("There are no more entries in the reference log but there are entries in the journal", logEntry);
assertEquals("Offset in journal not as expected", logEntry.getKey(), entry.getOffset());
assertEquals("Key in journal not as expected", logEntry.getValue().getFirst(), entry.getKey());
logEntry = logOrder.higherEntry(logEntry.getKey());
}
assertNull("There should be no more entries in the reference log", logEntry);
}