Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table store 2 #786

Draft
wants to merge 30 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c939ace
Created table store.
cody-littley Sep 19, 2024
8889ea2
Incremental progress.
cody-littley Sep 20, 2024
967b01c
Simplify by making max table count fixed.
cody-littley Sep 20, 2024
3f65d47
Use last table indices for internal tables.
cody-littley Sep 20, 2024
8894e43
Make table deletion atomic.
cody-littley Sep 20, 2024
61d8407
Cleanup.
cody-littley Sep 20, 2024
58d321c
Added table view.
cody-littley Sep 23, 2024
a85f0ee
Fix table view iteration.
cody-littley Sep 23, 2024
4a37778
Write some unit tests.
cody-littley Sep 24, 2024
4ebe6d0
checkin
cody-littley Sep 24, 2024
2695d49
Add random test, fix bugs.
cody-littley Sep 24, 2024
b288669
Added logging.
cody-littley Sep 24, 2024
0913aff
Fix lint issues.
cody-littley Sep 24, 2024
fc4b707
lint
cody-littley Sep 25, 2024
27d7f0b
Checkin.
cody-littley Sep 30, 2024
98bdde9
Small tweak to naming.
cody-littley Sep 30, 2024
5bed795
Finished conversion, now for the unit tests
cody-littley Sep 30, 2024
b6cd49b
Partially migrate to new API.
cody-littley Sep 30, 2024
efc7be6
Now compiles.
cody-littley Sep 30, 2024
0dc38c2
Get store tests working.
cody-littley Sep 30, 2024
a11e3b4
Unit tests are passing.
cody-littley Sep 30, 2024
dfe422a
Cleanup.
cody-littley Sep 30, 2024
091e6bb
Cleanup and unit tests.
cody-littley Sep 30, 2024
39a3521
Cleanup.
cody-littley Sep 30, 2024
3921d4a
Made suggested changes.
cody-littley Oct 3, 2024
ca76d4c
Made suggested changes.
cody-littley Oct 4, 2024
f424892
Refactored builder, need to convert unit tests
cody-littley Oct 7, 2024
6d0c2b5
Things compile, many tests commented out
cody-littley Oct 7, 2024
551e6cf
Enable more unit tests.
cody-littley Oct 7, 2024
2117cd6
Enabled remainder of tests.
cody-littley Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions common/kvstore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kvstore

// Batch is a collection of operations that can be applied atomically to a TableStore.
type Batch[T any] interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key T, value []byte)
// Delete removes the key from the batch.
Delete(key T)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Size() uint32
}
36 changes: 36 additions & 0 deletions common/kvstore/leveldb/leveldb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func NewStore(logger logging.Logger, path string) (kvstore.Store, error) {

// Put stores a data in the store.
func (store *levelDBStore) Put(key []byte, value []byte) error {
if value == nil {
value = []byte{}
}
return store.db.Put(key, value, nil)
}

Expand Down Expand Up @@ -84,6 +87,39 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error {
return store.db.Write(batch, nil)
}

// NewBatch creates a new batch for the store.
func (store *levelDBStore) NewBatch() kvstore.Batch[[]byte] {
return &levelDBBatch{
store: store,
batch: new(leveldb.Batch),
}
}

type levelDBBatch struct {
store *levelDBStore
batch *leveldb.Batch
}

func (m *levelDBBatch) Put(key []byte, value []byte) {
if value == nil {
value = []byte{}
}
m.batch.Put(key, value)
}

func (m *levelDBBatch) Delete(key []byte) {
m.batch.Delete(key)
}

func (m *levelDBBatch) Apply() error {
return m.store.db.Write(m.batch, nil)
}

// Size returns the number of operations in the batch.
func (m *levelDBBatch) Size() uint32 {
return uint32(m.batch.Len())
}

// Shutdown shuts down the store.
//
// Warning: it is not thread safe to call this method concurrently with other methods on this class,
Expand Down
58 changes: 58 additions & 0 deletions common/kvstore/mapstore/map_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewStore() kvstore.Store {

// Put adds a key-value pair to the store.
func (store *mapStore) Put(key []byte, value []byte) error {
if value == nil {
value = []byte{}
}

store.lock.Lock()
defer store.lock.Unlock()

Expand Down Expand Up @@ -74,6 +78,60 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error {
return nil
}

// NewBatch creates a new batch for the store.
func (store *mapStore) NewBatch() kvstore.Batch[[]byte] {
return &batch{
store: store,
keys: make([][]byte, 0),
values: make([][]byte, 0),
}
}

// batch is a collection of operations that can be applied atomically to a mapStore.
type batch struct {
store *mapStore
keys [][]byte
values [][]byte
}

// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
func (m *batch) Put(key []byte, value []byte) {
if value == nil {
value = []byte{}
}
m.keys = append(m.keys, key)
m.values = append(m.values, value)
}

// Delete removes the key from the batch. Does not return an error if the key does not exist.
func (m *batch) Delete(key []byte) {
m.keys = append(m.keys, key)
m.values = append(m.values, nil)
}

// Apply atomically writes & deletes all the key / value pairs in the batch to the database.
func (m *batch) Apply() error {
for i, key := range m.keys {
if m.values[i] == nil {
err := m.store.Delete(key)
if err != nil {
return err
}
} else {
err := m.store.Put(key, m.values[i])
if err != nil {
return err
}
}
}
return nil
}

// Size returns the number of operations in the batch.
func (m *batch) Size() uint32 {
return uint32(len(m.keys))
}

type mapIterator struct {
keys []string
values map[string][]byte
Expand Down
15 changes: 6 additions & 9 deletions common/kvstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"github.com/syndtr/goleveldb/leveldb/iterator"
)

// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")

// Store implements a key-value store. May be backed by a database like LevelDB.
//
// Implementations of this interface are expected to be thread-safe.
type Store interface {

// Put stores the given key / value pair in the database, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key []byte, value []byte) error

// Get retrieves the value for the given key. Returns a ErrNotFound error if the key does not exist.
Expand All @@ -20,12 +24,8 @@ type Store interface {
// Delete removes the key from the database. Does not return an error if the key does not exist.
Delete(key []byte) error

// DeleteBatch atomically removes a list of keys from the database.
DeleteBatch(keys [][]byte) error

// WriteBatch atomically writes a list of key / value pairs to the database. The key at index i in the keys slice
// corresponds to the value at index i in the values slice.
WriteBatch(keys, values [][]byte) error
// NewBatch creates a new batch that can be used to perform multiple operations atomically.
NewBatch() Batch[[]byte]

// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database.
// Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done.
Expand All @@ -45,6 +45,3 @@ type Store interface {
// or while there exist unclosed iterators.
Destroy() error
}

// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")
87 changes: 87 additions & 0 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package kvstore

import "errors"

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
// Store permits access to the table as if it were a store.
Store

// Name returns the name of the table.
Name() string

// TableKey creates a new key scoped to this table that can be used for batch operations that modify this table.
TableKey(key []byte) TableKey
}

// ErrTableLimitExceeded is returned when the maximum number of tables has been reached.
var ErrTableLimitExceeded = errors.New("table limit exceeded")

// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// TableStoreBuilder is used to create a new TableStore instance. It can be used to add and remove
// tables from the store. Once the TableStore is created, the TableStoreBuilder instance should not be used again,
// and no tables may be added or removed from the store. If table modifications are required on an existing
// TableStore, it should first be shut down and a new TableStoreBuilder should be created.
type TableStoreBuilder interface {

// CreateTable creates a new table with the given name. If a table with the given name already exists,
// this method becomes a no-op. Returns ErrTableLimitExceeded if the maximum number of tables has been reached.
CreateTable(name string) error

// DropTable deletes the table with the given name. If the table does not exist, this method becomes a no-op.
DropTable(name string) error

// GetMaxTableCount returns the maximum number of tables that can be created in the store
// (excluding internal tables utilized by the store).
GetMaxTableCount() uint32

// GetTableCount returns the current number of tables in the store
// (excluding internal tables utilized by the store).
GetTableCount() uint32

// GetTableNames returns a list of the names of all tables in the store, in no particular order.
GetTableNames() []string

// Build creates a new TableStore instance with the specified tables. After this method is called,
// the TableStoreBuilder should not be used again.
Build() (TableStore, error)

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error

// Destroy shuts down and permanently deletes all data in the store.
Destroy() error
}

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
// and keys within a particular table can be iterated over efficiently.
//
// A TableStore is only required to support a maximum of 2^32-X unique, where X is a small integer number of tables
// reserved for internal use by the table store. The exact value of X is implementation dependent.
//
// Implementations of this interface are expected to be thread-safe, except where noted.
type TableStore interface {

// GetTable gets the table with the given name. If the table does not exist, it is first created.
// Returns ErrTableNotFound if the table does not exist and cannot be created.
GetTable(name string) (Table, error)

// GetTables returns a list of all tables in the store in no particular order.
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() Batch[TableKey]

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error

// Destroy shuts down and permanently deletes all data in the store.
Destroy() error
}
74 changes: 74 additions & 0 deletions common/kvstore/tablestore/table_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tablestore

import (
"github.com/Layr-Labs/eigenda/common/kvstore"
"github.com/Layr-Labs/eigensdk-go/logging"
)

var _ kvstore.TableStore = &tableStore{}

// tableStore is an implementation of TableStore that wraps a Store.
type tableStore struct {
logger logging.Logger

// A base store implementation that this TableStore wraps.
base kvstore.Store

// A map from table names to tables.
tableMap map[string]kvstore.Table
}

// wrapper wraps the given Store to create a TableStore.
//
// WARNING: it is not safe to access the wrapped store directly while the TableStore is in use. The TableStore uses
// special key formatting, and direct access to the wrapped store may violate the TableStore's invariants, resulting
// in undefined behavior.
func newTableStore(
logger logging.Logger,
base kvstore.Store,
tables map[string]kvstore.Table) kvstore.TableStore {

return &tableStore{
logger: logger,
base: base,
tableMap: tables,
}
}

// GetTable gets the table with the given name. If the table does not exist, it is first created.
func (t *tableStore) GetTable(name string) (kvstore.Table, error) {
table, ok := t.tableMap[name]
if !ok {
return nil, kvstore.ErrTableNotFound
}

return table, nil
}

// GetTables returns a list of all tables in the store in no particular order.
func (t *tableStore) GetTables() []kvstore.Table {
tables := make([]kvstore.Table, 0, len(t.tableMap))
for _, table := range t.tableMap {
tables = append(tables, table)
}

return tables
}

// NewBatch creates a new batch for writing to the store.
func (t *tableStore) NewBatch() kvstore.Batch[kvstore.TableKey] {
return &tableStoreBatch{
store: t,
batch: t.base.NewBatch(),
}
}

// Shutdown shuts down the store, flushing any remaining cached data to disk.
func (t *tableStore) Shutdown() error {
return t.base.Shutdown()
}

// Destroy shuts down and permanently deletes all data in the store.
func (t *tableStore) Destroy() error {
return t.base.Destroy()
}
32 changes: 32 additions & 0 deletions common/kvstore/tablestore/table_store_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package tablestore

import "github.com/Layr-Labs/eigenda/common/kvstore"

// tableStoreBatch is a batch for writing to a table store.
type tableStoreBatch struct {
store *tableStore
batch kvstore.Batch[[]byte]
}

// Put adds a key-value pair to the batch.
func (t *tableStoreBatch) Put(key kvstore.TableKey, value []byte) {
if value == nil {
value = []byte{}
}
t.batch.Put(key, value)
}

// Delete removes a key-value pair from the batch.
func (t *tableStoreBatch) Delete(key kvstore.TableKey) {
t.batch.Delete(key)
}

// Apply applies the batch to the store.
func (t *tableStoreBatch) Apply() error {
return t.batch.Apply()
}

// Size returns the number of operations in the batch.
func (t *tableStoreBatch) Size() uint32 {
return t.batch.Size()
}
Loading
Loading