下面列出了javax.annotation.Nonnegative#org.rocksdb.RocksDB 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public RocksFullSnapshotStrategy(
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry,
@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
super(
DESCRIPTION,
db,
rocksDBResourceGuard,
keySerializer,
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
localRecoveryConfig,
cancelStreamRegistry);
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
}
public static void init(final String dbPath) {
try {
final long start = System.currentTimeMillis();
boolean result = FileIOUtils.createOrExistsDir(new File(dbPath));
assert(result != false);
result = FileIOUtils.createOrExistsDir(new File(DB_PATH_BACKUP));
assert(result != false);
result = FileIOUtils.createOrExistsDir(new File(DB_PATH_RESTORE));
assert(result != false);
DB = RocksDB.open(OptionsConfig.DB_OPTIONS, dbPath, CF_DESCRIPTORS, CF_HANDLES);
assert (DB != null);
initCFManger(CF_HANDLES);
final long cost = System.currentTimeMillis() - start;
LOGGER.info("succ open rocksdb, path:{}, cost:{}ms", dbPath, cost);
} catch (RocksDBException e) {
LOGGER.error("error while open rocksdb, path:{}, err:{}", dbPath, e.getMessage(), e);
}
}
private RocksDB openDB(String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1 + stateColumnFamilyDescriptors.size());
// we add the required descriptor for the default CF in FIRST position,
// see
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, colOptions));
columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
try {
return RocksDB.open(
dbOptions,
Preconditions.checkNotNull(path),
columnFamilyDescriptors,
stateColumnFamilyHandles);
} catch (RocksDBException e) {
throw new IOException("Error while opening RocksDB instance.", e);
}
}
@Test
public void readFromFileInvalidConfig() throws IOException {
final List<String> families =
Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
"First", "Second", "Third",
"Fourth", "Fifth",
"Sixth");
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>();
for (String family : families) {
columnFamilyDescriptors.add(
new ColumnFamilyDescriptor(family.getBytes(StandardCharsets.UTF_8),
new ColumnFamilyOptions()));
}
final DBOptions options = DBConfigFromFile.readFromFile("badfile.db.ini",
columnFamilyDescriptors);
// This has to return a Null, since we have config defined for badfile.db
Assert.assertNull(options);
}
/**
* Helper to load managed column family descriptors.
*/
private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException {
final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>();
final Options options = new Options(dbOptions, new ColumnFamilyOptions());
List<byte[]> existing = RocksDB.listColumnFamilies(options, rocksDBBasePath);
if (existing.isEmpty()) {
LOG.info("No column family found. Loading default");
managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
} else {
LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
managedColumnFamilies
.addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList()));
}
return managedColumnFamilies;
}
protected void buildIndex(Pair<RocksDB, Map<COLUMN_FAMILIES, ColumnFamilyHandle>> dbAndHandles, List<File> rdfFiles)
throws RocksDBException, ClassNotFoundException, IOException {
LOGGER.info("Building RocksDB index of data in RDF files");
RDFParser parser = Rio.createParser(RDFFormat.TURTLE);
LOGGER.info("Processing %d RDF files", rdfFiles.size());
for (File rdfFile : rdfFiles) {
LOGGER.info("Processing file %s", rdfFile.getAbsolutePath());
AbstractRDFHandler handler = PC_RDF_DATA_FILE_CONFIG.makeHandlerForDataFile(dbAndHandles, rdfFile);
if (handler == null) {
LOGGER.info("Skipping file without defined handler: %s", rdfFile.getAbsolutePath());
continue;
}
parser.setRDFHandler(handler);
parser.parse(new GZIPInputStream(new FileInputStream(rdfFile)), "");
LOGGER.info("Successfully parsed file at %s", rdfFile.getAbsolutePath());
}
LOGGER.info("Done processing RDF files");
}
RocksDBPriorityQueueSetFactory(
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
int numberOfKeyGroups,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.numberOfKeyGroups = numberOfKeyGroups;
this.kvStateInformation = kvStateInformation;
this.db = db;
this.writeBatchWrapper = writeBatchWrapper;
this.nativeMetricMonitor = nativeMetricMonitor;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
this.sharedElementOutView = new DataOutputSerializer(128);
this.sharedElementInView = new DataInputDeserializer();
}
/**
* Test that {@code ByteStoreManager#start()} waits until RocksDB lock
* is released
*/
@Test
public void testConcurrentOpenSleep() throws Exception {
String dbPath = temporaryFolder.newFolder().getAbsolutePath();
try(ByteStoreManager bsm = new ByteStoreManager(dbPath, false)) {
TestThread tt = new TestThread(bsm);
try(RocksDB db = RocksDB.open(new File(dbPath, ByteStoreManager.CATALOG_STORE_NAME).getAbsolutePath());
ColumnFamilyHandle handle = db.getDefaultColumnFamily()) {
tt.start();
tt.ready.countDown();
// Wait for multiple attempts
TimeUnit.MILLISECONDS.sleep(300);
// Lock should still be in place
assertEquals(1, tt.started.getCount());
assertFalse("RocksDB lock didn't work", tt.result.get());
}
// RocksDB is now closed, lock should be freed
tt.started.await();
assertTrue("RocksDB lock not released properly", tt.result.get());
}
}
public void build(ILookupTable srcLookupTable) {
File dbFolder = new File(dbPath);
if (dbFolder.exists()) {
logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
FileUtils.deleteQuietly(dbFolder);
} else {
logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
dbFolder.mkdirs();
}
logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
try (RocksDB rocksDB = RocksDB.open(options, dbPath)) {
// todo use batch may improve write performance
for (String[] row : srcLookupTable) {
KV kv = encoder.encode(row);
rocksDB.put(kv.getKey(), kv.getValue());
}
} catch (RocksDBException e) {
logger.error("error when put data to rocksDB", e);
throw new RuntimeException("error when write data to rocks db", e);
}
logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
}
public RocksDBSnapshotStrategyBase(
@Nonnull String description,
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry) {
super(description);
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.keySerializer = keySerializer;
this.kvStateInformation = kvStateInformation;
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.localRecoveryConfig = localRecoveryConfig;
this.cancelStreamRegistry = cancelStreamRegistry;
}
/**
* Construct the <code>RocksDbKeyValueReader</code> with store's name,
* database's path and Samza's config
*
* @param storeName name of the RocksDb defined in the config file
* @param dbPath path to the db directory
* @param config Samza's config
*/
public RocksDbKeyValueReader(String storeName, String dbPath, Config config) {
// get the key serde and value serde from the config
StorageConfig storageConfig = new StorageConfig(config);
SerializerConfig serializerConfig = new SerializerConfig(config);
keySerde = getSerdeFromName(storageConfig.getStorageKeySerde(storeName).orElse(null), serializerConfig);
valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), serializerConfig);
// get db options
Options options = RocksDbOptionsHelper.options(config, 1, new File(dbPath), StorageEngineFactory.StoreMode.ReadWrite);
// open the db
RocksDB.loadLibrary();
try {
db = RocksDB.openReadOnly(options, dbPath);
} catch (RocksDBException e) {
throw new SamzaException("can not open the rocksDb in " + dbPath, e);
}
}
public AbstractRocksDbHashtable(String path) {
File file = new File(path);
if (!file.exists()) {
file.mkdirs();
}
this.options = new Options();
this.options.setCreateIfMissing(true);
try {
this.db = RocksDB.open(options, path);
} catch (RocksDBException e) {
String message = String.format(
"Failed to open/create RocksDB database at %s. Error code: %s",
path, e.getStatus().getCodeString());
throw new RuntimeException(message, e);
}
}
public static void main(String[] args) throws Exception {
RocksDB.loadLibrary();
DB db = DB.build("D:\\temp\\db", false);
db.getList().add("hello", "i".getBytes());
db.getList().add("hello2", "i".getBytes());
db.getList().add("hello21", "i".getBytes());
db.getList().add("some1", "i".getBytes());
db.getList().delete("LPOP_LIST");
try (KeyIterator keyIterator = db.getList().getKeyIterator()) {
while (keyIterator.hasNext()) {
String key = keyIterator.next();
System.out.println(key);
}
}
}
public static AbstractRDFHandler makeHandlerForDataFile(
Pair<RocksDB, Map<COLUMN_FAMILIES, ColumnFamilyHandle>> dbAndHandles, File file) {
PC_RDF_DATA_FILE_CONFIG config = getDataTypeForFile(file);
if (config == null) {
LOGGER.info("No handler config found for file %s", file.getAbsolutePath());
return null;
}
LOGGER.info("Selected handler type %s for file %s", config.name(), file.getName());
return new PCRDFHandler(
dbAndHandles,
config.columnFamily,
config.keyType,
config.valType,
config.reverseSubjectAndObject,
config.valueTransformer
);
}
public RocksDBSnapshotStrategyBase(
@Nonnull String description,
@Nonnull RocksDB db,
@Nonnull ResourceGuard rocksDBResourceGuard,
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull LocalRecoveryConfig localRecoveryConfig,
@Nonnull CloseableRegistry cancelStreamRegistry) {
super(description);
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.keySerializer = keySerializer;
this.kvStateInformation = kvStateInformation;
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.localRecoveryConfig = localRecoveryConfig;
this.cancelStreamRegistry = cancelStreamRegistry;
}
private void verifyRocksObjectsReleased() {
//Ensure every RocksObject was closed exactly once
for (RocksObject rocksCloseable : allCreatedCloseables) {
verify(rocksCloseable, times(1)).close();
}
assertNotNull(null, keyedStateBackend.db);
RocksDB spyDB = keyedStateBackend.db;
if (!enableIncrementalCheckpointing) {
verify(spyDB, times(1)).getSnapshot();
verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class));
}
keyedStateBackend.dispose();
verify(spyDB, times(1)).close();
assertEquals(true, keyedStateBackend.isDisposed());
}
/**
* Tests that {@link RocksDBWriteBatchWrapper} flushes after the memory consumed exceeds the preconfigured value.
*/
@Test
public void testWriteBatchWrapperFlushAfterMemorySizeExceed() throws Exception {
try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
WriteOptions options = new WriteOptions().setDisableWAL(true);
ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200, 50)) {
long initBatchSize = writeBatchWrapper.getDataSize();
byte[] dummy = new byte[6];
ThreadLocalRandom.current().nextBytes(dummy);
// will add 1 + 1 + 1 + 6 + 1 + 6 = 16 bytes for each KV
// format is [handleType|kvType|keyLen|key|valueLen|value]
// more information please ref write_batch.cc in RocksDB
writeBatchWrapper.put(handle, dummy, dummy);
assertEquals(initBatchSize + 16, writeBatchWrapper.getDataSize());
writeBatchWrapper.put(handle, dummy, dummy);
assertEquals(initBatchSize + 32, writeBatchWrapper.getDataSize());
writeBatchWrapper.put(handle, dummy, dummy);
// will flush all, then an empty write batch
assertEquals(initBatchSize, writeBatchWrapper.getDataSize());
}
}
public CloseableIterator<GeoWaveRow> dataIndexIterator(final byte[][] dataIds) {
if (dataIds == null || dataIds.length == 0) {
return new CloseableIterator.Empty<>();
}
final RocksDB readDb = getReadDb();
if (readDb == null) {
return new CloseableIterator.Empty<>();
}
try {
final List<byte[]> dataIdsList = Arrays.asList(dataIds);
final Map<byte[], byte[]> dataIdxResults = readDb.multiGet(dataIdsList);
return new CloseableIterator.Wrapper(
dataIdsList.stream().filter(dataId -> dataIdxResults.containsKey(dataId)).map(
dataId -> DataIndexUtils.deserializeDataIndexRow(
dataId,
adapterId,
dataIdxResults.get(dataId),
visibilityEnabled)).iterator());
} catch (final RocksDBException e) {
LOGGER.error("Unable to get values by data ID", e);
}
return new CloseableIterator.Empty<>();
}
@Test
public void testTempLibFolderDeletedOnFail() throws Exception {
PowerMockito.spy(RocksDB.class);
PowerMockito.when(RocksDB.class, "loadLibrary").thenThrow(new ExpectedTestException());
File tempFolder = temporaryFolder.newFolder();
try {
RocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath());
fail("Not throwing expected exception.");
} catch (IOException ignored) {
// ignored
}
File[] files = tempFolder.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(0, files.length);
}
@Test
public void testAddWithVisibility() throws RocksDBException {
final RocksDB db = Mockito.mock(RocksDB.class);
final RocksDBMetadataTable metadataTable = new RocksDBMetadataTable(db, false, true, false);
final byte[] primaryId = new byte[] {4};
final byte[] secondaryId = new byte[] {2};
final byte[] value = new byte[] {123};
final GeoWaveMetadata metadata1 = new GeoWaveMetadata(primaryId, secondaryId, null, value);
metadataTable.add(metadata1);
Mockito.verify(db).put(new byte[] {4, 2, 0, 1}, value);
final byte[] visibility = new byte[] {6};
final GeoWaveMetadata metadata2 =
new GeoWaveMetadata(primaryId, secondaryId, visibility, value);
metadataTable.add(metadata2);
Mockito.verify(db).put(new byte[] {4, 2, 6, 1, 1}, value);
}
@Override
public BlockHeaderStorage newBlockHeaderStorage(String path) {
RocksDBBlockHeaderStorage rocksDBBlockHeaderStorage = new RocksDBBlockHeaderStorage();
Options options = new Options();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);
String dbPath = basePath + "/" + path;
try {
File dir = new File(dbPath);
if (!dir.exists()) {
dir.mkdirs();
} else {
if (!dir.isDirectory()) {
logger.error("File {} exists and isn't dir", dbPath);
}
}
RocksDB rocksDB = RocksDB.open(options, dbPath);
rocksDBBlockHeaderStorage.setRocksDB(rocksDB);
} catch (RocksDBException e) {
logger.error("RocksDB open failed", e);
}
return rocksDBBlockHeaderStorage;
}
@Override
protected void before() throws Throwable {
this.temporaryFolder = new TemporaryFolder();
this.temporaryFolder.create();
final File rocksFolder = temporaryFolder.newFolder();
this.dbOptions = optionsFactory.createDBOptions(
PredefinedOptions.DEFAULT.createDBOptions(handlesToClose), handlesToClose).setCreateIfMissing(true);
this.columnFamilyOptions = optionsFactory.createColumnOptions(
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
this.writeOptions = new WriteOptions();
this.writeOptions.disableWAL();
this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
this.columnFamilyHandles = new ArrayList<>(1);
this.rocksDB = RocksDB.open(
dbOptions,
rocksFolder.getAbsolutePath(),
Collections.singletonList(new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions)),
columnFamilyHandles);
this.batchWrapper = new RocksDBWriteBatchWrapper(rocksDB, writeOptions);
}
RocksDBPriorityQueueSetFactory(
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
int numberOfKeyGroups,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.numberOfKeyGroups = numberOfKeyGroups;
this.kvStateInformation = kvStateInformation;
this.db = db;
this.readOptions = readOptions;
this.writeBatchWrapper = writeBatchWrapper;
this.nativeMetricMonitor = nativeMetricMonitor;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
this.sharedElementOutView = new DataOutputSerializer(128);
this.sharedElementInView = new DataInputDeserializer();
}
@Test
public void testPathExceptionOnWindows() throws Exception {
assumeTrue(OperatingSystem.isWindows());
final File folder = TMP_DIR.newFolder();
final File rocksDir = new File(folder, getLongString(247 - folder.getAbsolutePath().length()));
Files.createDirectories(rocksDir.toPath());
try (DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
ColumnFamilyOptions colOptions = new ColumnFamilyOptions()) {
RocksDB rocks = RocksDBOperationUtils.openDB(
rocksDir.getAbsolutePath(),
Collections.emptyList(),
Collections.emptyList(),
colOptions, dbOptions);
rocks.close();
// do not provoke a test failure if this passes, because some setups may actually
// support long paths, in which case: great!
}
catch (IOException e) {
assertThat(e.getMessage(), containsString("longer than the directory path length limit for Windows"));
}
}
@Override
public synchronized DBMap openMap(String name) throws Exception
{
RocksDB db = null;
if (use_separate_dbs)
{
if (separate_db_map.containsKey(name)) db = separate_db_map.get(name);
else
{
File p = new File(base_path, name);
p.mkdirs();
db = openRocksDB(p.getPath());
separate_db_map.put(name, db);
}
}
else
{
db = shared_db;
}
return new RocksDBMap(this, db, name);
}
@Override
public void init() {
Options options = getOptions().setCreateIfMissing(true);
try {
String dataPath = dataSegmentFolder.getAbsolutePath() + "/data";
db = RocksDB.open(options, dataPath);
} catch (RocksDBException e) {
logger.error("init rocks db fail");
}
}
RocksDBMapIterator(
final RocksDB db,
final byte[] keyPrefixBytes,
final TypeSerializer<UK> keySerializer,
final TypeSerializer<UV> valueSerializer,
DataInputDeserializer dataInputView) {
this.db = db;
this.keyPrefixBytes = keyPrefixBytes;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.dataInputView = dataInputView;
}
/**
* Read DB and return existing column families.
* @return List of column families
* @throws RocksDBException on Error.
*/
private List<TableConfig> getColumnFamiliesInExistingDb()
throws RocksDBException {
List<byte[]> bytes = RocksDB.listColumnFamilies(new Options(),
dbLocation.getAbsolutePath());
List<TableConfig> columnFamiliesInDb = bytes.stream()
.map(cfbytes -> new TableConfig(StringUtils.bytes2String(cfbytes),
DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE.getColumnFamilyOptions()))
.collect(Collectors.toList());
if (LOG.isDebugEnabled()) {
LOG.debug("Found column Families in DB : {}",
columnFamiliesInDb);
}
return columnFamiliesInDb;
}
/**
* Not strictly a unit test. Just a confirmation of the expected behavior
* of RocksDB keyMayExist API.
* Expected behavior - On average, keyMayExist latency < key.get() latency
* for invalid keys.
* @throws Exception if unable to read from RocksDB.
*/
@Test
public void testRocksDBKeyMayExistApi() throws Exception {
try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, configSet)) {
RocksDB db = newStore.getDb();
//Test with 50 invalid keys.
long start = System.nanoTime();
for (int i = 0; i < 50; i++) {
Assert.assertTrue(db.get(
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("key" + i)) == null);
}
long end = System.nanoTime();
long keyGetLatency = end - start;
start = System.nanoTime();
for (int i = 0; i < 50; i++) {
Assert.assertFalse(db.keyMayExist(
org.apache.commons.codec.binary.StringUtils
.getBytesUtf16("key" + i), null));
}
end = System.nanoTime();
long keyMayExistLatency = end - start;
Assert.assertTrue(keyMayExistLatency < keyGetLatency);
}
}
public void list() throws Exception {
String crawldbPath = FilenameUtils.concat(crawlPath, "crawldb");
RocksDB crawldbDatabase = RocksDBUtils.open(crawldbPath);
RocksIterator crawldbIterator = crawldbDatabase.newIterator();
for(crawldbIterator.seekToFirst(); crawldbIterator.isValid(); crawldbIterator.next()){
CrawlDatum datum = RocksDBUtils.createCrawlDatum(crawldbIterator.key(), crawldbIterator.value());
System.out.println(CrawlDatumFormater.datumToString(datum));
}
crawldbDatabase.close();
}