diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 92b0d80afc..3aa099c0f8 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -357,8 +357,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r // NewPebbleDBDatabase creates a persistent key-value database without a freezer // moving immutable chain segments into cold storage. -func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) { - db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral) +func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) { + db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions) if err != nil { return nil, err } @@ -399,6 +399,8 @@ type OpenOptions struct { // Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of // a crash is not important. This option should typically be used in tests. Ephemeral bool + + PebbleExtraOptions *pebble.ExtraOptions } // openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble. @@ -420,7 +422,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } if o.Type == dbPebble || existingDb == dbPebble { log.Info("Using pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } if o.Type == dbLeveldb || existingDb == dbLeveldb { log.Info("Using leveldb as the backing database") @@ -428,7 +430,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } // No pre-existing database, no user-requested one either. Default to Pebble. log.Info("Defaulting to pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } // Open opens both a disk-based key-value database such as leveldb or pebble, but also diff --git a/ethdb/pebble/extraoptions.go b/ethdb/pebble/extraoptions.go new file mode 100644 index 0000000000..787167c1cc --- /dev/null +++ b/ethdb/pebble/extraoptions.go @@ -0,0 +1,35 @@ +package pebble + +import "time" + +type ExtraOptions struct { + BytesPerSync int + L0CompactionFileThreshold int + L0CompactionThreshold int + L0StopWritesThreshold int + LBaseMaxBytes int64 + MemTableStopWritesThreshold int + MaxConcurrentCompactions func() int + DisableAutomaticCompactions bool + WALBytesPerSync int + WALDir string + WALMinSyncInterval func() time.Duration + TargetByteDeletionRate int + Experimental ExtraOptionsExperimental + Levels []ExtraLevelOptions +} + +type ExtraOptionsExperimental struct { + L0CompactionConcurrency int + CompactionDebtConcurrency uint64 + ReadCompactionRate int64 + ReadSamplingMultiplier int64 + MaxWriterConcurrency int + ForceWriterParallelism bool +} + +type ExtraLevelOptions struct { + BlockSize int + IndexBlockSize int + TargetFileSize int64 +} diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index 8d1f91964b..ab6bb1d72d 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -70,6 +70,25 @@ type Database struct { seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated + compDebtGauge metrics.Gauge + compInProgressGauge metrics.Gauge + + commitCountMeter metrics.Meter + commitTotalDurationMeter metrics.Meter + commitSemaphoreWaitMeter metrics.Meter + commitMemTableWriteStallMeter metrics.Meter + commitL0ReadAmpWriteStallMeter metrics.Meter + commitWALRotationMeter metrics.Meter + commitWaitMeter metrics.Meter + + commitCount atomic.Int64 + commitTotalDuration atomic.Int64 + commitSemaphoreWait atomic.Int64 + commitMemTableWriteStall atomic.Int64 + commitL0ReadAmpWriteStall atomic.Int64 + commitWALRotation atomic.Int64 + commitWait atomic.Int64 + levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag @@ -137,7 +156,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) { // New returns a wrapped pebble DB object. The namespace is the prefix that the // metrics reporting should use for surfacing internal stats. -func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) { +func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) { + if extraOptions == nil { + extraOptions = &ExtraOptions{} + } + if extraOptions.MemTableStopWritesThreshold <= 0 { + extraOptions.MemTableStopWritesThreshold = 2 + } + if extraOptions.MaxConcurrentCompactions == nil { + extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() } + } + var levels []pebble.LevelOptions + if len(extraOptions.Levels) == 0 { + levels = []pebble.LevelOptions{ + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + } + } else { + for _, level := range extraOptions.Levels { + levels = append(levels, pebble.LevelOptions{ + BlockSize: level.BlockSize, + IndexBlockSize: level.IndexBlockSize, + TargetFileSize: level.TargetFileSize, + FilterPolicy: bloom.FilterPolicy(10), + }) + } + } + // Ensure we have some minimal caching and file guarantees if cache < minCache { cache = minCache @@ -162,7 +212,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // Two memory tables is configured which is identical to leveldb, // including a frozen memory table and another live one. - memTableLimit := 2 + memTableLimit := extraOptions.MemTableStopWritesThreshold memTableSize := cache * 1024 * 1024 / 2 / memTableLimit // The memory table size is currently capped at maxMemTableSize-1 due to a @@ -200,19 +250,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // The default compaction concurrency(1 thread), // Here use all available CPUs for faster compaction. - MaxConcurrentCompactions: func() int { return runtime.NumCPU() }, + MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions, - // Per-level options. Options for at least one level must be specified. The - // options for the last level are used for all subsequent levels. - Levels: []pebble.LevelOptions{ - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - }, + // Per-level extraOptions. Options for at least one level must be specified. The + // extraOptions for the last level are used for all subsequent levels. + Levels: levels, ReadOnly: readonly, EventListener: &pebble.EventListener{ CompactionBegin: db.onCompactionBegin, @@ -221,11 +263,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e WriteStallEnd: db.onWriteStallEnd, }, Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble + + BytesPerSync: extraOptions.BytesPerSync, + L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold, + L0CompactionThreshold: extraOptions.L0CompactionThreshold, + L0StopWritesThreshold: extraOptions.L0StopWritesThreshold, + LBaseMaxBytes: extraOptions.LBaseMaxBytes, + DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions, + WALBytesPerSync: extraOptions.WALBytesPerSync, + WALDir: extraOptions.WALDir, + WALMinSyncInterval: extraOptions.WALMinSyncInterval, + TargetByteDeletionRate: extraOptions.TargetByteDeletionRate, } // Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130 // for more details. opt.Experimental.ReadSamplingMultiplier = -1 + if opt.Experimental.ReadSamplingMultiplier != 0 { + opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier + } + opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency + opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency + opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate + opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency + opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism + // Open the db and recover any potential corruptions innerDB, err := pebble.Open(file, opt) if err != nil { @@ -247,6 +309,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e db.seekCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/seek", nil) db.manualMemAllocGauge = metrics.GetOrRegisterGauge(namespace+"memory/manualalloc", nil) + db.compDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/debt", nil) + db.compInProgressGauge = metrics.GetOrRegisterGauge(namespace+"compact/inprogress", nil) + + db.commitCountMeter = metrics.GetOrRegisterMeter(namespace+"commit/counter", nil) + db.commitTotalDurationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/total", nil) + db.commitSemaphoreWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/semaphorewait", nil) + db.commitMemTableWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/memtablewritestall", nil) + db.commitL0ReadAmpWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/l0readampwritestall", nil) + db.commitWALRotationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/walrotation", nil) + db.commitWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/commitwait", nil) + // Start up the metrics gathering and return go db.meter(metricsGatheringInterval, namespace) return db, nil @@ -459,6 +532,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { compReads [2]int64 nWrites [2]int64 + + commitCounts [2]int64 + commitTotalDurations [2]int64 + commitSemaphoreWaits [2]int64 + commitMemTableWriteStalls [2]int64 + commitL0ReadAmpWriteStalls [2]int64 + commitWALRotations [2]int64 + commitWaits [2]int64 ) // Iterate ad infinitum and collect the stats @@ -474,6 +555,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { writeDelayTime = d.writeDelayTime.Load() nonLevel0CompCount = int64(d.nonLevel0Comp.Load()) level0CompCount = int64(d.level0Comp.Load()) + + commitCount = d.commitCount.Load() + commitTotalDuration = d.commitTotalDuration.Load() + commitSemaphoreWait = d.commitSemaphoreWait.Load() + commitMemTableWriteStall = d.commitMemTableWriteStall.Load() + commitL0ReadAmpWriteStall = d.commitL0ReadAmpWriteStall.Load() + commitWALRotation = d.commitWALRotation.Load() + commitWait = d.commitWait.Load() ) writeDelayTimes[i%2] = writeDelayTime writeDelayCounts[i%2] = writeDelayCount @@ -524,6 +613,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) { d.level0CompGauge.Update(level0CompCount) d.seekCompGauge.Update(stats.Compact.ReadCount) + commitCounts[i%2] = commitCount + commitTotalDurations[i%2] = commitTotalDuration + commitSemaphoreWaits[i%2] = commitSemaphoreWait + commitMemTableWriteStalls[i%2] = commitMemTableWriteStall + commitL0ReadAmpWriteStalls[i%2] = commitL0ReadAmpWriteStall + commitWALRotations[i%2] = commitWALRotation + commitWaits[i%2] = commitWait + + d.commitCountMeter.Mark(commitCounts[i%2] - commitCounts[(i-1)%2]) + d.commitTotalDurationMeter.Mark(commitTotalDurations[i%2] - commitTotalDurations[(i-1)%2]) + d.commitSemaphoreWaitMeter.Mark(commitSemaphoreWaits[i%2] - commitSemaphoreWaits[(i-1)%2]) + d.commitMemTableWriteStallMeter.Mark(commitMemTableWriteStalls[i%2] - commitMemTableWriteStalls[(i-1)%2]) + d.commitL0ReadAmpWriteStallMeter.Mark(commitL0ReadAmpWriteStalls[i%2] - commitL0ReadAmpWriteStalls[(i-1)%2]) + d.commitWALRotationMeter.Mark(commitWALRotations[i%2] - commitWALRotations[(i-1)%2]) + d.commitWaitMeter.Mark(commitWaits[i%2] - commitWaits[(i-1)%2]) + + d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt)) + d.compInProgressGauge.Update(stats.Compact.NumInProgress) + for i, level := range stats.Levels { // Append metrics for additional layers if i >= len(d.levelsGauge) { @@ -578,7 +686,20 @@ func (b *batch) Write() error { if b.db.closed { return pebble.ErrClosed } - return b.b.Commit(b.db.writeOptions) + err := b.b.Commit(b.db.writeOptions) + if err != nil { + return err + } + stats := b.b.CommitStats() + b.db.commitCount.Add(1) + b.db.commitTotalDuration.Add(int64(stats.TotalDuration)) + b.db.commitSemaphoreWait.Add(int64(stats.SemaphoreWaitDuration)) + b.db.commitMemTableWriteStall.Add(int64(stats.MemTableWriteStallDuration)) + b.db.commitL0ReadAmpWriteStall.Add(int64(stats.L0ReadAmpWriteStallDuration)) + b.db.commitWALRotation.Add(int64(stats.WALRotationDuration)) + b.db.commitWait.Add(int64(stats.CommitWaitDuration)) + // TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0) + return nil } // Reset resets the batch for reuse. diff --git a/ethdb/pebble/pebble_non64bit.go b/ethdb/pebble/pebble_non64bit.go index b028c7e2e9..bd503aadea 100644 --- a/ethdb/pebble/pebble_non64bit.go +++ b/ethdb/pebble/pebble_non64bit.go @@ -8,6 +8,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) -func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (ethdb.Database, error) { +func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (ethdb.Database, error) { return nil, errors.New("pebble is not supported on this platform") } diff --git a/node/node.go b/node/node.go index 20aad14e90..d8a2905787 100644 --- a/node/node.go +++ b/node/node.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -746,6 +747,10 @@ func (n *Node) EventMux() *event.TypeMux { // previous can be found) from within the node's instance directory. If the node is // ephemeral, a memory database is returned. func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -758,12 +763,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) } @@ -779,6 +785,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -790,13 +800,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - AncientsDirectory: n.ResolveAncient(name, ancient), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + AncientsDirectory: n.ResolveAncient(name, ancient), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) } if err == nil {