Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose memtable flush #72

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rocksdb/options/dbopts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ proc defaultDbOptions*(autoClose = false): DbOptionsRef =
# Enable creating column families if they do not exist
dbOpts.createMissingColumnFamilies = true

# Make sure flush is atomic accross column families
dbOpts.atomicFlush = true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not something we want to enable by default, ie it carries greater (performance) implications than is obvious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks, will remove


# Options recommended by rocksdb devs themselves, for new databases
# https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options

Expand Down
43 changes: 43 additions & 0 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export
type
RocksDbPtr* = ptr rocksdb_t
IngestExternalFilesOptionsPtr = ptr rocksdb_ingestexternalfileoptions_t
FlushOptionsPtr = ptr rocksdb_flushoptions_t

RocksDbRef* = ref object of RootObj
lock: Lock
Expand All @@ -57,6 +58,7 @@ type
RocksDbReadWriteRef* = ref object of RocksDbRef
writeOpts: WriteOptionsRef
ingestOptsPtr: IngestExternalFilesOptionsPtr
flushOptsPtr: FlushOptionsPtr

proc listColumnFamilies*(path: string): RocksDBResult[seq[string]] =
## List exisiting column families on disk. This might be used to find out
Expand Down Expand Up @@ -135,6 +137,9 @@ proc openRocksDb*(
autoCloseNonNil(writeOpts)
autoCloseAll(cfs)

let flushOptsPtr = rocksdb_flushoptions_create()
rocksdb_flushoptions_set_wait(flushOptsPtr, 1)

let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadWriteRef(
Expand All @@ -144,6 +149,7 @@ proc openRocksDb*(
dbOpts: dbOpts,
readOpts: readOpts,
writeOpts: writeOpts,
flushOptsPtr: flushOptsPtr,
cfDescriptors: cfs,
ingestOptsPtr: rocksdb_ingestexternalfileoptions_create(),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
Expand Down Expand Up @@ -465,6 +471,40 @@ proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) =
rocksdb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc flush*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Flush all memory table data for the given column family.
doAssert not db.isClosed()

var errors: cstring
rocksdb_flush_cf(
db.cPtr, db.flushOptsPtr, cfHandle.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)

ok()

proc flush*(
db: RocksDbReadWriteRef, cfHandles: openArray[ColFamilyHandleRef]
): RocksDBResult[void] =
## Flush all memory table data for the given column families.
doAssert not db.isClosed()

var
cfs = cfHandles.mapIt(it.cPtr)
errors: cstring
rocksdb_flush_cfs(
db.cPtr,
db.flushOptsPtr,
addr cfs[0],
cint(cfs.len),
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)

ok()

proc close*(db: RocksDbRef) =
## Close the `RocksDbRef` which will release the connection to the database
## and free the memory associated with it. `close` is idempotent and can
Expand All @@ -490,3 +530,6 @@ proc close*(db: RocksDbRef) =

rocksdb_ingestexternalfileoptions_destroy(db.ingestOptsPtr)
db.ingestOptsPtr = nil

rocksdb_flushoptions_destroy(db.flushOptsPtr)
db.flushOptsPtr = nil
15 changes: 15 additions & 0 deletions tests/test_rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,18 @@ suite "RocksDbRef Tests":

db.releaseSnapshot(snapshot)
check snapshot.isClosed()

test "Test flush":
check:
db.put(key, val).isOk()
db.flush().isOk()

check:
db.put(otherKey, val, otherCfHandle).isOk()
db.flush(otherCfHandle).isOk()

let cfHandles = [defaultCfHandle, otherCfHandle]
check:
db.put(otherKey, val, defaultCfHandle).isOk()
db.put(key, val, otherCfHandle).isOk()
db.flush(cfHandles).isOk()