Skip to content

Commit

Permalink
expose more pebble open options, add compact/debt and compact/inprogr…
Browse files Browse the repository at this point in the history
…ess metrics
  • Loading branch information
magicxyyz committed Apr 20, 2024
1 parent cb4fc77 commit 935cb21
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 25 deletions.
10 changes: 6 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r

// NewPebbleDBDatabase creates a persistent key-value database without a freezer
// moving immutable chain segments into cold storage.
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral)
func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -366,6 +366,8 @@ type OpenOptions struct {
// Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of
// a crash is not important. This option should typically be used in tests.
Ephemeral bool

PebbleExtraOptions *pebble.ExtraOptions
}

// openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble.
Expand All @@ -387,15 +389,15 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) {
}
if o.Type == dbPebble || existingDb == dbPebble {
log.Info("Using pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}
if o.Type == dbLeveldb || existingDb == dbLeveldb {
log.Info("Using leveldb as the backing database")
return NewLevelDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly)
}
// No pre-existing database, no user-requested one either. Default to Pebble.
log.Info("Defaulting to pebble as the backing database")
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions)
}

// Open opens both a disk-based key-value database such as leveldb or pebble, but also
Expand Down
27 changes: 27 additions & 0 deletions ethdb/pebble/extraoptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pebble

import "time"

type ExtraOptions struct {
BytesPerSync int
L0CompactionFileThreshold int
L0CompactionThreshold int
L0StopWritesThreshold int
LBaseMaxBytes int64
MaxConcurrentCompactions func() int
DisableAutomaticCompactions bool
WALBytesPerSync int
WALDir string
WALMinSyncInterval func() time.Duration
TargetByteDeletionRate int
Experimental ExtraOptionsExperimental
}

type ExtraOptionsExperimental struct {
L0CompactionConcurrency int
CompactionDebtConcurrency uint64
ReadCompactionRate int64
ReadSamplingMultiplier int64
MaxWriterConcurrency int
ForceWriterParallelism bool
}
53 changes: 46 additions & 7 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (

// metricsGatheringInterval specifies the interval to retrieve pebble database
// compaction, io and pause stats to report to the user.
metricsGatheringInterval = 3 * time.Second
metricsGatheringInterval = 100 * time.Millisecond //3 * time.Second
)

// Database is a persistent key-value store based on the pebble storage engine.
Expand All @@ -71,6 +71,9 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated

compDebtGauge metrics.Gauge
compInProgressGauge metrics.Gauge

levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels

quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
Expand Down Expand Up @@ -138,7 +141,7 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {

// New returns a wrapped pebble DB object. The namespace is the prefix that the
// metrics reporting should use for surfacing internal stats.
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) {
// Ensure we have some minimal caching and file guarantees
if cache < minCache {
cache = minCache
Expand Down Expand Up @@ -181,7 +184,16 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
quitChan: make(chan chan error),
writeOptions: &pebble.WriteOptions{Sync: !ephemeral},
}
opt := &pebble.Options{

if extraOptions == nil {
extraOptions = &ExtraOptions{}
}
if extraOptions.MaxConcurrentCompactions == nil {
extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() }
}

var opt *pebble.Options

Check failure on line 195 in ethdb/pebble/pebble.go

View workflow job for this annotation

GitHub Actions / run-linter

S1021: should merge variable declaration with assignment on next line (gosimple)
opt = &pebble.Options{
// Pebble has a single combined cache area and the write
// buffers are taken from this too. Assign all available
// memory allowance for cache.
Expand All @@ -195,16 +207,16 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
// Note, this must be the number of tables not the size of all memtables
// according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742
// according to https://github.com/cockroachdb/pebble/blob/master/extraOptions.go#L738-L742
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
MemTableStopWritesThreshold: memTableLimit,

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },
MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions,

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
// Per-level extraOptions. Options for at least one level must be specified. The
// extraOptions for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
Expand All @@ -222,11 +234,32 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
WriteStallEnd: db.onWriteStallEnd,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble

BytesPerSync: extraOptions.BytesPerSync,
L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold,
L0CompactionThreshold: extraOptions.L0CompactionThreshold,
L0StopWritesThreshold: extraOptions.L0StopWritesThreshold,
LBaseMaxBytes: extraOptions.LBaseMaxBytes,
DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions,
WALBytesPerSync: extraOptions.WALBytesPerSync,
WALDir: extraOptions.WALDir,
WALMinSyncInterval: extraOptions.WALMinSyncInterval,
TargetByteDeletionRate: extraOptions.TargetByteDeletionRate,
}

// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1

if opt.Experimental.ReadSamplingMultiplier != 0 {
opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier
}
opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency
opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency
opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate
opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency
opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism

// Open the db and recover any potential corruptions
innerDB, err := pebble.Open(file, opt)
if err != nil {
Expand All @@ -248,6 +281,9 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil)

db.compDebtGauge = metrics.NewRegisteredGauge(namespace+"compact/debt", nil)
db.compInProgressGauge = metrics.NewRegisteredGauge(namespace+"compact/inprogress", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
Expand Down Expand Up @@ -525,6 +561,9 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(stats.Compact.ReadCount)

d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt))
d.compInProgressGauge.Update(stats.Compact.NumInProgress)

for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
Expand Down
2 changes: 1 addition & 1 deletion ethdb/pebble/pebble_non64bit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
)

func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) {
func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (ethdb.Database, error) {
return nil, errors.New("pebble is not supported on this platform")
}
37 changes: 24 additions & 13 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -744,6 +745,10 @@ func (n *Node) EventMux() *event.TypeMux {
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -756,12 +761,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}

Expand All @@ -777,6 +783,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) {
return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil)
}

func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -788,13 +798,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient
db = rawdb.NewMemoryDatabase()
} else {
db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
Type: n.config.DBEngine,
Directory: n.ResolvePath(name),
AncientsDirectory: n.ResolveAncient(name, ancient),
Namespace: namespace,
Cache: cache,
Handles: handles,
ReadOnly: readonly,
PebbleExtraOptions: pebbleExtraOptions,
})
}

Expand Down

0 comments on commit 935cb21

Please sign in to comment.