From 018fd778d4da743cbd005a233b4d8e00164aaca4 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 27 Oct 2024 08:01:37 +0100 Subject: [PATCH 1/3] expose memtable flush This can be used to avoid log replays on startup, for example --- rocksdb/rocksdb.nim | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rocksdb/rocksdb.nim b/rocksdb/rocksdb.nim index 879a7f0..3921985 100644 --- a/rocksdb/rocksdb.nim +++ b/rocksdb/rocksdb.nim @@ -465,6 +465,22 @@ proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) = rocksdb_release_snapshot(db.cPtr, snapshot.cPtr) snapshot.setClosed() +proc flush*(db: RocksDbRef, cfs: openArray[ColFamilyHandleRef]): RocksDBResult[void] = + withLock(db.lock): + if not db.isClosed(): + var cfs = cfs.mapIt(it.cPtr) + var errors: cstring + var opts = rocksdb_flushoptions_create() + defer: + rocksdb_flushoptions_destroy(opts) + rocksdb_flushoptions_set_wait(opts, 1) + rocksdb_flush_cfs( + db.cPtr, opts, addr cfs[0], cint(cfs.len), cast[cstringArray](errors.addr) + ) + bailOnErrors(errors) + + ok() + proc close*(db: RocksDbRef) = ## Close the `RocksDbRef` which will release the connection to the database ## and free the memory associated with it. `close` is idempotent and can From 22297a47e51eb044c2a975fbb0c06cc95082da8f Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:09:04 +0800 Subject: [PATCH 2/3] Add flush to RocksDbReadWriteRef. --- rocksdb/options/dbopts.nim | 3 +++ rocksdb/rocksdb.nim | 53 ++++++++++++++++++++++++++++---------- tests/test_rocksdb.nim | 15 +++++++++++ 3 files changed, 58 insertions(+), 13 deletions(-) diff --git a/rocksdb/options/dbopts.nim b/rocksdb/options/dbopts.nim index 3bc3ae3..b23af70 100644 --- a/rocksdb/options/dbopts.nim +++ b/rocksdb/options/dbopts.nim @@ -111,6 +111,9 @@ proc defaultDbOptions*(autoClose = false): DbOptionsRef = # Enable creating column families if they do not exist dbOpts.createMissingColumnFamilies = true + # Make sure flush is atomic accross column families + dbOpts.atomicFlush = true + # Options recommended by rocksdb devs themselves, for new databases # https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options diff --git a/rocksdb/rocksdb.nim b/rocksdb/rocksdb.nim index 3921985..77a3d41 100644 --- a/rocksdb/rocksdb.nim +++ b/rocksdb/rocksdb.nim @@ -41,6 +41,7 @@ export type RocksDbPtr* = ptr rocksdb_t IngestExternalFilesOptionsPtr = ptr rocksdb_ingestexternalfileoptions_t + FlushOptionsPtr = ptr rocksdb_flushoptions_t RocksDbRef* = ref object of RootObj lock: Lock @@ -57,6 +58,7 @@ type RocksDbReadWriteRef* = ref object of RocksDbRef writeOpts: WriteOptionsRef ingestOptsPtr: IngestExternalFilesOptionsPtr + flushOptsPtr: FlushOptionsPtr proc listColumnFamilies*(path: string): RocksDBResult[seq[string]] = ## List exisiting column families on disk. This might be used to find out @@ -135,6 +137,9 @@ proc openRocksDb*( autoCloseNonNil(writeOpts) autoCloseAll(cfs) + let flushOptsPtr = rocksdb_flushoptions_create() + rocksdb_flushoptions_set_wait(flushOptsPtr, 1) + let cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles) db = RocksDbReadWriteRef( @@ -144,6 +149,7 @@ proc openRocksDb*( dbOpts: dbOpts, readOpts: readOpts, writeOpts: writeOpts, + flushOptsPtr: flushOptsPtr, cfDescriptors: cfs, ingestOptsPtr: rocksdb_ingestexternalfileoptions_create(), defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME), @@ -465,19 +471,37 @@ proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) = rocksdb_release_snapshot(db.cPtr, snapshot.cPtr) snapshot.setClosed() -proc flush*(db: RocksDbRef, cfs: openArray[ColFamilyHandleRef]): RocksDBResult[void] = - withLock(db.lock): - if not db.isClosed(): - var cfs = cfs.mapIt(it.cPtr) - var errors: cstring - var opts = rocksdb_flushoptions_create() - defer: - rocksdb_flushoptions_destroy(opts) - rocksdb_flushoptions_set_wait(opts, 1) - rocksdb_flush_cfs( - db.cPtr, opts, addr cfs[0], cint(cfs.len), cast[cstringArray](errors.addr) - ) - bailOnErrors(errors) +proc flush*( + db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle +): RocksDBResult[void] = + ## Flush all memory table data for the given column family. + doAssert not db.isClosed() + + var errors: cstring + rocksdb_flush_cf( + db.cPtr, db.flushOptsPtr, cfHandle.cPtr, cast[cstringArray](errors.addr) + ) + bailOnErrors(errors) + + ok() + +proc flush*( + db: RocksDbReadWriteRef, cfHandles: openArray[ColFamilyHandleRef] +): RocksDBResult[void] = + ## Flush all memory table data for the given column families. + doAssert not db.isClosed() + + var + cfs = cfHandles.mapIt(it.cPtr) + errors: cstring + rocksdb_flush_cfs( + db.cPtr, + db.flushOptsPtr, + addr cfs[0], + cint(cfs.len), + cast[cstringArray](errors.addr), + ) + bailOnErrors(errors) ok() @@ -506,3 +530,6 @@ proc close*(db: RocksDbRef) = rocksdb_ingestexternalfileoptions_destroy(db.ingestOptsPtr) db.ingestOptsPtr = nil + + rocksdb_flushoptions_destroy(db.flushOptsPtr) + db.flushOptsPtr = nil diff --git a/tests/test_rocksdb.nim b/tests/test_rocksdb.nim index 21cbde9..f89d3e4 100644 --- a/tests/test_rocksdb.nim +++ b/tests/test_rocksdb.nim @@ -499,3 +499,18 @@ suite "RocksDbRef Tests": db.releaseSnapshot(snapshot) check snapshot.isClosed() + + test "Test flush": + check: + db.put(key, val).isOk() + db.flush().isOk() + + check: + db.put(otherKey, val, otherCfHandle).isOk() + db.flush(otherCfHandle).isOk() + + let cfHandles = [defaultCfHandle, otherCfHandle] + check: + db.put(otherKey, val, defaultCfHandle).isOk() + db.put(key, val, otherCfHandle).isOk() + db.flush(cfHandles).isOk() From 208fbd040f6ec89556a49cd236d8251b938ca57b Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:39:18 +0800 Subject: [PATCH 3/3] Remove atomic flush default. --- rocksdb/options/dbopts.nim | 3 --- 1 file changed, 3 deletions(-) diff --git a/rocksdb/options/dbopts.nim b/rocksdb/options/dbopts.nim index b23af70..3bc3ae3 100644 --- a/rocksdb/options/dbopts.nim +++ b/rocksdb/options/dbopts.nim @@ -111,9 +111,6 @@ proc defaultDbOptions*(autoClose = false): DbOptionsRef = # Enable creating column families if they do not exist dbOpts.createMissingColumnFamilies = true - # Make sure flush is atomic accross column families - dbOpts.atomicFlush = true - # Options recommended by rocksdb devs themselves, for new databases # https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options