Skip to content

Commit

Permalink
Update library to support column families. If not specified, uses the…
Browse files Browse the repository at this point in the history
… 'default' column family.
  • Loading branch information
bhartnett committed Feb 13, 2024
1 parent 5e2b026 commit 6277dbe
Showing 1 changed file with 125 additions and 53 deletions.
178 changes: 125 additions & 53 deletions rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

{.push raises: [Defect].}

import cpuinfo, options, stew/[byteutils, results]

from system/ansi_c import c_free
import std/[cpuinfo, options, tables],
stew/[byteutils, results]

export results

Expand Down Expand Up @@ -40,6 +39,8 @@ type
readOptions*: rocksdb_readoptions_t
writeOptions: rocksdb_writeoptions_t
dbPath: string # needed for clear()
columnFamilyNames: cstringArray
columnFamilies: TableRef[cstring, rocksdb_column_family_handle_t]

DataProc* = proc(val: openArray[byte]) {.gcsafe, raises: [Defect].}

Expand All @@ -49,18 +50,33 @@ template bailOnErrors {.dirty.} =
if not errors.isNil:
result.err($errors)
rocksdb_free(errors)
return
return result

template validateColumnFamily(
db: RocksDBInstance,
columnFamily: string): rocksdb_column_family_handle_t =

if not db.columnFamilies.contains(columnFamily):
return err("rocksdb: invalid column family")

let columnFamilyHandle= db.columnFamilies.getOrDefault(columnFamily)
doAssert not columnFamilyHandle.isNil
columnFamilyHandle


proc init*(rocks: var RocksDBInstance,
dbPath, dbBackupPath: string,
readOnly = false,
cpus = countProcessors(),
createIfMissing = true,
maxOpenFiles = -1): RocksDBResult[void] =
maxOpenFiles = -1,
columnFamiliesNames: openArray[string] = @["default"]): RocksDBResult[void] =
rocks.options = rocksdb_options_create()
rocks.readOptions = rocksdb_readoptions_create()
rocks.writeOptions = rocksdb_writeoptions_create()
rocks.dbPath = dbPath
rocks.columnFamilyNames = columnFamiliesNames.allocCStringArray
rocks.columnFamilies = newTable[cstring, rocksdb_column_family_handle_t]()

# Optimize RocksDB. This is the easiest way to get RocksDB to perform well:
rocksdb_options_increase_parallelism(rocks.options, cpus.int32)
Expand All @@ -71,16 +87,42 @@ proc init*(rocks: var RocksDBInstance,
# default set to keep all files open (-1), allow setting it to a specific
# value, e.g. in case the application limit would be reached.
rocksdb_options_set_max_open_files(rocks.options, maxOpenFiles.cint)
# Enable creating column families if they do not exist
rocksdb_options_set_create_missing_column_families(rocks.options, uint8(true))

var errors: cstring
var
columnFamilyHandles = newSeq[rocksdb_column_family_handle_t](columnFamiliesNames.len)
errors: cstring
if readOnly:
rocks.db = rocksdb_open_for_read_only(rocks.options, dbPath, 0'u8, errors.addr)
rocks.db = rocksdb_open_for_read_only_column_families(
rocks.options,
dbPath,
columnFamiliesNames.len().cint,
rocks.columnFamilyNames,
rocks.options.addr, # TODO: test this. Might need to turn this into array of options
columnFamilyHandles[0].addr,
0'u8,
errors.addr)
else:
rocks.db = rocksdb_open(rocks.options, dbPath, errors.addr)
rocks.db = rocksdb_open_column_families(
rocks.options,
dbPath,
columnFamiliesNames.len().cint,
rocks.columnFamilyNames,
rocks.options.addr, # TODO: test this. Might need to turn this into array of options
columnFamilyHandles[0].addr,
errors.addr)
bailOnErrors()
rocks.backupEngine = rocksdb_backup_engine_open(rocks.options,
dbBackupPath, errors.addr)

for i in 0..<columnFamiliesNames.len:
rocks.columnFamilies[columnFamiliesNames[i].cstring] = columnFamilyHandles[i]

rocks.backupEngine = rocksdb_backup_engine_open(
rocks.options,
dbBackupPath,
errors.addr)
bailOnErrors()

ok()

template initRocksDB*(args: varargs[untyped]): Option[RocksDBInstance] =
Expand All @@ -90,33 +132,29 @@ template initRocksDB*(args: varargs[untyped]): Option[RocksDBInstance] =
else:
some(db)

template getImpl(T: type) {.dirty.} =
if key.len <= 0:
return err("rocksdb: key cannot be empty on get")

var
errors: cstring
len: csize_t
data = rocksdb_get(db.db, db.readOptions,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
addr len, addr errors)
bailOnErrors()
if not data.isNil:
result = ok(toOpenArray(data, 0, int(len) - 1).to(T))
rocksdb_free(data)
else:
result = err("")
proc get*(
db: RocksDBInstance,
key: openArray[byte],
onData: DataProc,
columnFamily = "default"): RocksDBResult[bool] =

proc get*(db: RocksDBInstance, key: openArray[byte], onData: DataProc): RocksDBResult[bool] =
if key.len <= 0:
return err("rocksdb: key cannot be empty on get")

let columnFamilyHandle = db.validateColumnFamily(columnFamily)

var
errors: cstring
len: csize_t
data = rocksdb_get(db.db, db.readOptions,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
addr len, addr errors)
data = rocksdb_get_cf(
db.db,
db.readOptions,
columnFamilyHandle,
cast[cstring](unsafeAddr key[0]),
csize_t(key.len),
addr len,
addr errors)

bailOnErrors()
if not data.isNil:
# TODO onData may raise a Defect - in theory we could catch it and free the
Expand Down Expand Up @@ -152,52 +190,79 @@ proc getBytes*(db: RocksDBInstance, key: openArray[byte]): RocksDBResult[seq[byt
else:
err("")

proc put*(db: RocksDBInstance, key, val: openArray[byte]): RocksDBResult[void] =
proc put*(
db: RocksDBInstance,
key, val: openArray[byte],
columnFamily = "default"): RocksDBResult[void] =

if key.len <= 0:
return err("rocksdb: key cannot be empty on put")

let columnFamilyHandle = db.validateColumnFamily(columnFamily)

var
errors: cstring

rocksdb_put(db.db, db.writeOptions,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
cast[cstring](if val.len > 0: unsafeAddr val[0] else: nil),
csize_t(val.len),
errors.addr)
rocksdb_put_cf(
db.db,
db.writeOptions,
columnFamilyHandle,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
cast[cstring](if val.len > 0: unsafeAddr val[0] else: nil),
csize_t(val.len),
errors.addr)

bailOnErrors()
ok()

proc contains*(db: RocksDBInstance, key: openArray[byte]): RocksDBResult[bool] =
proc contains*(db: RocksDBInstance, key: openArray[byte], columnFamily = "default"): RocksDBResult[bool] =
if key.len <= 0:
return err("rocksdb: key cannot be empty on contains")

let columnFamilyHandle = db.validateColumnFamily(columnFamily)

var
errors: cstring
len: csize_t
data = rocksdb_get(db.db, db.readOptions,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
addr len, errors.addr)
data = rocksdb_get_cf(
db.db,
db.readOptions,
columnFamilyHandle,
cast[cstring](unsafeAddr key[0]),
csize_t(key.len),
addr len,
errors.addr)
bailOnErrors()

if not data.isNil:
rocksdb_free(data)
ok(true)
else:
ok(false)

proc del*(db: RocksDBInstance, key: openArray[byte]): RocksDBResult[bool] =
proc del*(
db: RocksDBInstance,
key: openArray[byte],
columnFamily = "default"): RocksDBResult[bool] =

if key.len <= 0:
return err("rocksdb: key cannot be empty on del")

let columnFamilyHandle = db.validateColumnFamily(columnFamily)

# This seems like a bad idea, but right now I don't want to
# get sidetracked by this. --Adam
if not db.contains(key).get:
return ok(false)

var errors: cstring
rocksdb_delete(db.db, db.writeOptions,
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
errors.addr)
rocksdb_delete_cf(
db.db,
db.writeOptions,
columnFamilyHandle,
cast[cstring](unsafeAddr key[0]),
csize_t(key.len),
errors.addr)
bailOnErrors()
ok(true)

Expand All @@ -214,23 +279,30 @@ proc backup*(db: RocksDBInstance): RocksDBResult[void] =
# https://github.com/nim-lang/Nim/issues/8112
# proc `=destroy`*(db: var RocksDBInstance) =
proc close*(db: var RocksDBInstance) =
template freeField(name) =
type FieldType = typeof db.`name`

if not db.columnFamilies.isNil:
for k, v in db.columnFamilies:
rocksdb_column_family_handle_destroy(v)
db.columnFamilies = nil

if not db.columnFamilyNames.isNil:
db.columnFamilyNames.deallocCStringArray()
db.columnFamilyNames = nil

template freeField(name) =
if db.`name`.isNil:
`rocksdb name destroy`(db.`name`)
db.`name` = FieldType(nil)
template setFieldToNil(name) =
type FieldType = typeof db.`name`
db.`name` = FieldType(nil)

db.`name` = nil

freeField(writeOptions)
freeField(readOptions)
freeField(options)

if not db.backupEngine.isNil:
rocksdb_backup_engine_close(db.backupEngine)
setFieldToNil(backupEngine)
db.backupEngine = nil

if not db.db.isNil:
rocksdb_close(db.db)
setFieldToNil(db)
db.db = nil

0 comments on commit 6277dbe

Please sign in to comment.