Skip to content

Commit

Permalink
Update API to support passing in ColFamilyHandleRef instead of column…
Browse files Browse the repository at this point in the history
… family string. (#52)

* Refactor API to pass in column family handle instead of string.

* Check for unknown column family in getColFamilyHandle and update tests.

* Update column family function names. Remove redundant exists check.
  • Loading branch information
bhartnett authored Jun 26, 2024
1 parent a691d5b commit ee15ce0
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 282 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ jobs:
include:
- target:
os: linux
builder: ubuntu-20.04
builder: ubuntu-latest
shell: bash
- target:
os: macos
builder: macos-11
builder: macos-12
shell: bash
- target:
os: windows
Expand Down
2 changes: 1 addition & 1 deletion rocksdb.nimble
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
packageName = "rocksdb"
version = "0.4.0"
version = "0.5.0"
author = "Status Research & Development GmbH"
description =
"A wrapper for Facebook's RocksDB, an embeddable, persistent key-value store for fast storage"
Expand Down
41 changes: 22 additions & 19 deletions rocksdb/columnfamily.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,38 @@
{.push raises: [].}

import ./rocksdb
import ./columnfamily/cfhandle, ./rocksdb

export rocksdb

type
ColFamilyReadOnly* = object
db: RocksDbReadOnlyRef
name: string
handle: ColFamilyHandleRef

ColFamilyReadWrite* = object
db: RocksDbReadWriteRef
name: string
handle: ColFamilyHandleRef

proc withColFamily*(
proc getColFamily*(
db: RocksDbReadOnlyRef, name: string
): RocksDBResult[ColFamilyReadOnly] =
## Creates a new `ColFamilyReadOnly` from the given `RocksDbReadOnlyRef` and
## column family name.
doAssert not db.isClosed()

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)
ok(ColFamilyReadOnly(db: db, name: name, handle: ?db.getColFamilyHandle(name)))

ok(ColFamilyReadOnly(db: db, name: name))

proc withColFamily*(
proc getColFamily*(
db: RocksDbReadWriteRef, name: string
): RocksDBResult[ColFamilyReadWrite] =
## Create a new `ColFamilyReadWrite` from the given `RocksDbReadWriteRef` and
## column family name.
doAssert not db.isClosed()

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)

ok(ColFamilyReadWrite(db: db, name: name))
ok(ColFamilyReadWrite(db: db, name: name, handle: ?db.getColFamilyHandle(name)))

proc db*(cf: ColFamilyReadOnly | ColFamilyReadWrite): auto {.inline.} =
## Returns the underlying `RocksDbReadOnlyRef` or `RocksDbReadWriteRef`.
Expand All @@ -64,46 +61,52 @@ proc name*(cf: ColFamilyReadOnly | ColFamilyReadWrite): string {.inline.} =
## Returns the name of the column family.
cf.name

proc handle*(
cf: ColFamilyReadOnly | ColFamilyReadWrite
): ColFamilyHandleRef {.inline.} =
## Returns the name of the column family.
cf.handle

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite, key: openArray[byte], onData: DataProc
): RocksDBResult[bool] {.inline.} =
## Gets the value of the given key from the column family using the `onData`
## callback.
cf.db.get(key, onData, cf.name)
cf.db.get(key, onData, cf.handle)

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite, key: openArray[byte]
): RocksDBResult[seq[byte]] {.inline.} =
## Gets the value of the given key from the column family.
cf.db.get(key, cf.name)
cf.db.get(key, cf.handle)

proc put*(
cf: ColFamilyReadWrite, key, val: openArray[byte]
): RocksDBResult[void] {.inline.} =
## Puts a value for the given key into the column family.
cf.db.put(key, val, cf.name)
cf.db.put(key, val, cf.handle)

proc keyExists*(
cf: ColFamilyReadOnly | ColFamilyReadWrite, key: openArray[byte]
): RocksDBResult[bool] {.inline.} =
## Checks if the given key exists in the column family.
cf.db.keyExists(key, cf.name)
cf.db.keyExists(key, cf.handle)

proc delete*(
cf: ColFamilyReadWrite, key: openArray[byte]
): RocksDBResult[void] {.inline.} =
## Deletes the given key from the column family.
cf.db.delete(key, cf.name)
cf.db.delete(key, cf.handle)

proc openIterator*(
cf: ColFamilyReadOnly | ColFamilyReadWrite
): RocksDBResult[RocksIteratorRef] {.inline.} =
## Opens an `RocksIteratorRef` for the given column family.
cf.db.openIterator(cf.name)
cf.db.openIterator(cf.handle)

proc openWriteBatch*(cf: ColFamilyReadWrite): WriteBatchRef {.inline.} =
## Opens a `WriteBatchRef` for the given column family.
cf.db.openWriteBatch(cf.name)
cf.db.openWriteBatch(cf.handle)

proc write*(
cf: ColFamilyReadWrite, updates: WriteBatchRef
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/internal/cftable.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ proc newColFamilyTable*(
proc isClosed*(table: ColFamilyTableRef): bool {.inline.} =
table.columnFamilies.isNil()

proc get*(table: ColFamilyTableRef, name: string): ColFamilyHandleRef =
proc get*(table: ColFamilyTableRef, name: string): ColFamilyHandleRef {.inline.} =
table.columnFamilies.getOrDefault(name)

proc close*(table: ColFamilyTableRef) =
Expand Down
63 changes: 27 additions & 36 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type
path: string
dbOpts: DbOptionsRef
readOpts: ReadOptionsRef
defaultCfName: string
defaultCfHandle: ColFamilyHandleRef
cfTable: ColFamilyTableRef

RocksDbReadOnlyRef* = ref object of RocksDbRef
Expand Down Expand Up @@ -143,6 +143,7 @@ proc openRocksDb*(
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
writeOpts = (if writeOpts.isNil: defaultWriteOptions() else: writeOpts)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadWriteRef(
lock: createLock(),
cPtr: rocksDbPtr,
Expand All @@ -151,8 +152,8 @@ proc openRocksDb*(
readOpts: readOpts,
writeOpts: writeOpts,
ingestOptsPtr: rocksdb_ingestexternalfileoptions_create(),
defaultCfName: DEFAULT_COLUMN_FAMILY_NAME,
cfTable: newColFamilyTable(cfNames.mapIt($it), cfHandles),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)

Expand Down Expand Up @@ -198,17 +199,27 @@ proc openRocksDbReadOnly*(
let
dbOpts = useDbOpts # don't close on exit
readOpts = (if readOpts.isNil: defaultReadOptions() else: readOpts)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = RocksDbReadOnlyRef(
lock: createLock(),
cPtr: rocksDbPtr,
path: path,
dbOpts: dbOpts,
readOpts: readOpts,
defaultCfName: DEFAULT_COLUMN_FAMILY_NAME,
cfTable: newColFamilyTable(cfNames.mapIt($it), cfHandles),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)

proc getColFamilyHandle*(
db: RocksDbRef, name: string
): RocksDBResult[ColFamilyHandleRef] =
let cfHandle = db.cfTable.get(name)
if cfHandle.isNil():
err("rocksdb: unknown column family")
else:
ok(cfHandle)

proc isClosed*(db: RocksDbRef): bool {.inline.} =
## Returns `true` if the database has been closed and `false` otherwise.
db.cPtr.isNil()
Expand All @@ -222,7 +233,7 @@ proc get*(
db: RocksDbRef,
key: openArray[byte],
onData: DataProc,
columnFamily = db.defaultCfName,
cfHandle = db.defaultCfHandle,
): RocksDBResult[bool] =
## Get the value for the given key from the specified column family.
## If the value does not exist, `false` will be returned in the result
Expand All @@ -234,10 +245,6 @@ proc get*(
if key.len() == 0:
return err("rocksdb: key is empty")

let cfHandle = db.cfTable.get(columnFamily)
if cfHandle.isNil():
return err("rocksdb: unknown column family")

var
len: csize_t
errors: cstring
Expand All @@ -261,7 +268,7 @@ proc get*(
ok(true)

proc get*(
db: RocksDbRef, key: openArray[byte], columnFamily = db.defaultCfName
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[seq[byte]] =
## Get the value for the given key from the specified column family.
## If the value does not exist, an empty error will be returned in the result.
Expand All @@ -271,24 +278,20 @@ proc get*(
proc onData(data: openArray[byte]) =
dataRes.ok(@data)

let res = db.get(key, onData, columnFamily)
let res = db.get(key, onData, cfHandle)
if res.isOk():
return dataRes

dataRes.err(res.error())

proc put*(
db: RocksDbReadWriteRef, key, val: openArray[byte], columnFamily = db.defaultCfName
db: RocksDbReadWriteRef, key, val: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Put the value for the given key into the specified column family.

if key.len() == 0:
return err("rocksdb: key is empty")

let cfHandle = db.cfTable.get(columnFamily)
if cfHandle.isNil():
return err("rocksdb: unknown column family")

var errors: cstring
rocksdb_put_cf(
db.cPtr,
Expand All @@ -309,7 +312,7 @@ proc put*(
ok()

proc keyExists*(
db: RocksDbRef, key: openArray[byte], columnFamily = db.defaultCfName
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## Check if the key exists in the specified column family.
## Returns a result containing `true` if the key exists or a result
Expand All @@ -323,11 +326,11 @@ proc keyExists*(
proc(data: openArray[byte]) =
discard
,
columnFamily,
cfHandle,
)

proc delete*(
db: RocksDbReadWriteRef, key: openArray[byte], columnFamily = db.defaultCfName
db: RocksDbReadWriteRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Delete the value for the given key from the specified column family.
## If the value does not exist, the delete will be a no-op.
Expand All @@ -336,10 +339,6 @@ proc delete*(
if key.len() == 0:
return err("rocksdb: key is empty")

let cfHandle = db.cfTable.get(columnFamily)
if cfHandle.isNil:
return err("rocksdb: unknown column family")

var errors: cstring
rocksdb_delete_cf(
db.cPtr,
Expand All @@ -354,27 +353,23 @@ proc delete*(
ok()

proc openIterator*(
db: RocksDbRef, columnFamily = db.defaultCfName
db: RocksDbRef, cfHandle = db.defaultCfHandle
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
doAssert not db.isClosed()

let cfHandle = db.cfTable.get(columnFamily)
if cfHandle.isNil():
return err("rocksdb: unknown column family")

let rocksIterPtr =
rocksdb_create_iterator_cf(db.cPtr, db.readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr))

proc openWriteBatch*(
db: RocksDbReadWriteRef, columnFamily = db.defaultCfName
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): WriteBatchRef =
## Opens a `WriteBatchRef` which defaults to using the specified column family.
doAssert not db.isClosed()

newWriteBatch(db.cfTable, columnFamily)
newWriteBatch(cfHandle)

proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchRef` to the database.
Expand All @@ -389,17 +384,13 @@ proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void
ok()

proc ingestExternalFile*(
db: RocksDbReadWriteRef, filePath: string, columnFamily = db.defaultCfName
db: RocksDbReadWriteRef, filePath: string, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
## Ingest an external sst file into the database. The file will be ingested
## into the specified column family or the default column family if none is
## provided.
doAssert not db.isClosed()

let cfHandle = db.cfTable.get(columnFamily)
if cfHandle.isNil():
return err("rocksdb: unknown column family")

var
sstPath = filePath.cstring
errors: cstring
Expand Down
18 changes: 15 additions & 3 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type
path: string
dbOpts: DbOptionsRef
txDbOpts: TransactionDbOptionsRef
defaultCfHandle: ColFamilyHandleRef
cfTable: ColFamilyTableRef

proc openTransactionDb*(
Expand Down Expand Up @@ -79,16 +80,27 @@ proc openTransactionDb*(
txDbOpts = (if txDbOpts.isNil: defaultTransactionDbOptions()
else: txDbOpts
)
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = TransactionDbRef(
lock: createLock(),
cPtr: txDbPtr,
path: path,
dbOpts: dbOpts,
txDbOpts: txDbOpts,
cfTable: newColFamilyTable(cfNames.mapIt($it), cfHandles),
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)

proc getColFamilyHandle*(
db: TransactionDbRef, name: string
): RocksDBResult[ColFamilyHandleRef] =
let cfHandle = db.cfTable.get(name)
if cfHandle.isNil():
err("rocksdb: unknown column family")
else:
ok(cfHandle)

proc isClosed*(db: TransactionDbRef): bool {.inline.} =
## Returns `true` if the `TransactionDbRef` has been closed.
db.cPtr.isNil()
Expand All @@ -99,7 +111,7 @@ proc beginTransaction*(
writeOpts = WriteOptionsRef(nil),
txDbOpts = TransactionDbOptionsRef(nil),
txOpts = defaultTransactionOptions(),
columnFamily = DEFAULT_COLUMN_FAMILY_NAME,
cfHandle = db.defaultCfHandle,
): TransactionRef =
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
Expand All @@ -114,7 +126,7 @@ proc beginTransaction*(

let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil)

newTransaction(txPtr, readOpts, writeOpts, txOpts, columnFamily, db.cfTable)
newTransaction(txPtr, readOpts, writeOpts, txOpts, cfHandle)

proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`.
Expand Down
Loading

0 comments on commit ee15ce0

Please sign in to comment.