diff --git a/cmd/geth/main.go b/cmd/geth/main.go index d987364ca1..4a8d7abd21 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -160,7 +160,9 @@ var ( configFileFlag, utils.LogDebugFlag, utils.LogBacktraceAtFlag, - }, utils.NetworkFlags, utils.DatabaseFlags) + }, utils.NetworkFlags, utils.DatabaseFlags, + utils.PebbleFlags, + ) rpcFlags = []cli.Flag{ utils.HTTPEnabledFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f4ecd35287..7be5e5a410 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1720,6 +1720,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { setRequiredBlocks(ctx, cfg) setLes(ctx, cfg) + setPebbleExtraOptions(ctx, cfg) + log.Debug("Set pebble extra options", "pebble_extra_options", cfg.PebbleExtraOptions) + // Cap the cache allowance and tune the garbage collector mem, err := gopsutil.VirtualMemory() if err == nil { @@ -1947,7 +1950,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if rawdb.ReadCanonicalHash(chaindb, 0) != (common.Hash{}) { cfg.Genesis = nil // fallback to db content - //validate genesis has PoS enabled in block 0 + // validate genesis has PoS enabled in block 0 genesis, err := core.ReadGenesis(chaindb) if err != nil { Fatalf("Could not read genesis from database: %v", err) diff --git a/cmd/utils/pebble_extra.go b/cmd/utils/pebble_extra.go new file mode 100644 index 0000000000..ae6e089b4e --- /dev/null +++ b/cmd/utils/pebble_extra.go @@ -0,0 +1,176 @@ +package utils + +import ( + "time" + + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/ethdb/pebble" + "github.com/ethereum/go-ethereum/internal/flags" + "github.com/urfave/cli/v2" +) + +var ( + PebbleBytesPerSyncFlag = &cli.IntFlag{ + Name: "pebble.bytes-per-sync", + Category: flags.PebbleCategory, + } + PebbleL0CompactionFileThresholdFlag = &cli.IntFlag{ + Name: "pebble.l0-compaction-file-threshold", + Category: flags.PebbleCategory, + } + PebbleL0CompactionThresholdFlag = &cli.IntFlag{ + Name: "pebble.l0-compaction-threshold", + Category: flags.PebbleCategory, + } + PebbleL0StopWritesThresholdFlag = &cli.IntFlag{ + Name: "pebble.l0-stop-writes-threshold", + Category: flags.PebbleCategory, + } + PebbleLBaseMaxBytesFlag = &cli.Int64Flag{ + Name: "pebble.l-base-max-bytes", + Category: flags.PebbleCategory, + } + PebbleMemTableStopWritesThresholdFlag = &cli.IntFlag{ + Name: "pebble.mem-table-stop-writes-threshold", + Category: flags.PebbleCategory, + } + PebbleMaxConcurrentCompactionsFlag = &cli.IntFlag{ + Name: "pebble.max-concurrent-compactions", + Category: flags.PebbleCategory, + } + PebbleDisableAutomaticCompactionsFlag = &cli.BoolFlag{ + Name: "pebble.disable-automatic-compactions", + Category: flags.PebbleCategory, + } + PebbleWALBytesPerSyncFlag = &cli.IntFlag{ + Name: "pebble.wal-bytes-per-sync", + Category: flags.PebbleCategory, + } + PebbleWALDirFlag = &cli.StringFlag{ + Name: "pebble.wal-dir", + Category: flags.PebbleCategory, + } + PebbleWALMinSyncIntervalFlag = &cli.DurationFlag{ + Name: "pebble.wal-min-sync-interval", + Category: flags.PebbleCategory, + } + PebbleTargetByteDeletionRateFlag = &cli.IntFlag{ + Name: "pebble.target-byte-deletion-rate", + Category: flags.PebbleCategory, + } + + // TODO: PebbleLevelOptions + + // Experimental + + PebbleL0CompactionConcurrencyFlag = &cli.IntFlag{ + Name: "pebble.l0-compaction-concurrency", + Category: flags.PebbleCategory, + } + PebbleCompactionDebtConcurrencyFlag = &cli.Uint64Flag{ + Name: "pebble.compaction-debt-concurrency", + Category: flags.PebbleCategory, + } + PebbleReadCompactionRateFlag = &cli.Int64Flag{ + Name: "pebble.read-compaction-rate", + Category: flags.PebbleCategory, + } + PebbleReadSamplingMultiplierFlag = &cli.Int64Flag{ + Name: "pebble.read-sampling-multiplier", + Category: flags.PebbleCategory, + } + PebbleMaxWriterConcurrencyFlag = &cli.IntFlag{ + Name: "pebble.max-writer-concurrency", + Category: flags.PebbleCategory, + } + PebbleForceWriterParallelismFlag = &cli.BoolFlag{ + Name: "pebble.force-writer-parallelism", + Category: flags.PebbleCategory, + } + + PebbleFlags = []cli.Flag{ + PebbleBytesPerSyncFlag, + PebbleL0CompactionFileThresholdFlag, + PebbleL0CompactionThresholdFlag, + PebbleL0StopWritesThresholdFlag, + PebbleLBaseMaxBytesFlag, + PebbleMemTableStopWritesThresholdFlag, + PebbleMaxConcurrentCompactionsFlag, + PebbleDisableAutomaticCompactionsFlag, + PebbleWALBytesPerSyncFlag, + PebbleWALDirFlag, + PebbleWALMinSyncIntervalFlag, + PebbleTargetByteDeletionRateFlag, + // Experimental + PebbleL0CompactionConcurrencyFlag, + PebbleCompactionDebtConcurrencyFlag, + PebbleReadCompactionRateFlag, + PebbleReadSamplingMultiplierFlag, + PebbleMaxWriterConcurrencyFlag, + PebbleForceWriterParallelismFlag, + } +) + +func setPebbleExtraOptions(ctx *cli.Context, cfg *ethconfig.Config) { + peos := new(pebble.ExtraOptions) + + if flag := PebbleBytesPerSyncFlag.Name; ctx.IsSet(flag) { + peos.BytesPerSync = ctx.Int(flag) + } + if flag := PebbleL0CompactionFileThresholdFlag.Name; ctx.IsSet(flag) { + peos.L0CompactionFileThreshold = ctx.Int(flag) + } + if flag := PebbleL0CompactionThresholdFlag.Name; ctx.IsSet(flag) { + peos.L0CompactionThreshold = ctx.Int(flag) + } + if flag := PebbleL0StopWritesThresholdFlag.Name; ctx.IsSet(flag) { + peos.L0StopWritesThreshold = ctx.Int(flag) + } + if flag := PebbleLBaseMaxBytesFlag.Name; ctx.IsSet(flag) { + peos.LBaseMaxBytes = ctx.Int64(flag) + } + if flag := PebbleMemTableStopWritesThresholdFlag.Name; ctx.IsSet(flag) { + peos.MemTableStopWritesThreshold = ctx.Int(flag) + } + if flag := PebbleMaxConcurrentCompactionsFlag.Name; ctx.IsSet(flag) { + peos.MaxConcurrentCompactions = func() int { return ctx.Int(flag) } + } + if flag := PebbleDisableAutomaticCompactionsFlag.Name; ctx.IsSet(flag) { + peos.DisableAutomaticCompactions = ctx.Bool(flag) + } + if flag := PebbleWALBytesPerSyncFlag.Name; ctx.IsSet(flag) { + peos.WALBytesPerSync = ctx.Int(flag) + } + if flag := PebbleWALDirFlag.Name; ctx.IsSet(flag) { + peos.WALDir = ctx.String(flag) + } + if flag := PebbleWALMinSyncIntervalFlag.Name; ctx.IsSet(flag) { + peos.WALMinSyncInterval = func() time.Duration { return ctx.Duration(flag) } + } + if flag := PebbleTargetByteDeletionRateFlag.Name; ctx.IsSet(flag) { + peos.TargetByteDeletionRate = ctx.Int(flag) + } + + // Experimental + + if flag := PebbleL0CompactionConcurrencyFlag.Name; ctx.IsSet(flag) { + peos.Experimental.L0CompactionConcurrency = ctx.Int(flag) + } + if flag := PebbleCompactionDebtConcurrencyFlag.Name; ctx.IsSet(flag) { + peos.Experimental.CompactionDebtConcurrency = ctx.Uint64(flag) + } + if flag := PebbleReadCompactionRateFlag.Name; ctx.IsSet(flag) { + peos.Experimental.ReadCompactionRate = ctx.Int64(flag) + } + if flag := PebbleReadSamplingMultiplierFlag.Name; ctx.IsSet(flag) { + peos.Experimental.ReadSamplingMultiplier = ctx.Int64(flag) + } + if flag := PebbleMaxWriterConcurrencyFlag.Name; ctx.IsSet(flag) { + peos.Experimental.MaxWriterConcurrency = ctx.Int(flag) + } + if flag := PebbleForceWriterParallelismFlag.Name; ctx.IsSet(flag) { + peos.Experimental.ForceWriterParallelism = ctx.Bool(flag) + } + + cfg.PebbleExtraOptions = peos +} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 27a9ec7412..29a3ec1615 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -324,8 +324,8 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r // NewPebbleDBDatabase creates a persistent key-value database without a freezer // moving immutable chain segments into cold storage. -func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) { - db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral) +func NewPebbleDBDatabase(file string, cache int, handles int, namespace string, readonly, ephemeral bool, extraOptions *pebble.ExtraOptions) (ethdb.Database, error) { + db, err := pebble.New(file, cache, handles, namespace, readonly, ephemeral, extraOptions) if err != nil { return nil, err } @@ -366,6 +366,8 @@ type OpenOptions struct { // Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of // a crash is not important. This option should typically be used in tests. Ephemeral bool + + PebbleExtraOptions *pebble.ExtraOptions } // openKeyValueDatabase opens a disk-based key-value database, e.g. leveldb or pebble. @@ -387,7 +389,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } if o.Type == dbPebble || existingDb == dbPebble { log.Info("Using pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } if o.Type == dbLeveldb || existingDb == dbLeveldb { log.Info("Using leveldb as the backing database") @@ -395,7 +397,7 @@ func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) { } // No pre-existing database, no user-requested one either. Default to Pebble. log.Info("Defaulting to pebble as the backing database") - return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral) + return NewPebbleDBDatabase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral, o.PebbleExtraOptions) } // Open opens both a disk-based key-value database such as leveldb or pebble, but also diff --git a/eth/backend.go b/eth/backend.go index aedbd01634..1ee8e9c0c3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -136,7 +136,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) // Assemble the Ethereum object - chainDb, err := stack.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/", false) + chainDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/", false, config.PebbleExtraOptions) if err != nil { return nil, err } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index fa1bfd4642..40e4bd2aa6 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" ) @@ -178,6 +179,8 @@ type Config struct { RollupDisableTxPoolGossip bool RollupDisableTxPoolAdmission bool RollupHaltOnIncompatibleProtocolVersion string + + PebbleExtraOptions *pebble.ExtraOptions } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/ethdb/pebble/extraoptions.go b/ethdb/pebble/extraoptions.go new file mode 100644 index 0000000000..787167c1cc --- /dev/null +++ b/ethdb/pebble/extraoptions.go @@ -0,0 +1,35 @@ +package pebble + +import "time" + +type ExtraOptions struct { + BytesPerSync int + L0CompactionFileThreshold int + L0CompactionThreshold int + L0StopWritesThreshold int + LBaseMaxBytes int64 + MemTableStopWritesThreshold int + MaxConcurrentCompactions func() int + DisableAutomaticCompactions bool + WALBytesPerSync int + WALDir string + WALMinSyncInterval func() time.Duration + TargetByteDeletionRate int + Experimental ExtraOptionsExperimental + Levels []ExtraLevelOptions +} + +type ExtraOptionsExperimental struct { + L0CompactionConcurrency int + CompactionDebtConcurrency uint64 + ReadCompactionRate int64 + ReadSamplingMultiplier int64 + MaxWriterConcurrency int + ForceWriterParallelism bool +} + +type ExtraLevelOptions struct { + BlockSize int + IndexBlockSize int + TargetFileSize int64 +} diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index af4686cf5b..e091d4543a 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -68,6 +68,25 @@ type Database struct { seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated + compDebtGauge metrics.Gauge + compInProgressGauge metrics.Gauge + + commitCountMeter metrics.Meter + commitTotalDurationMeter metrics.Meter + commitSemaphoreWaitMeter metrics.Meter + commitMemTableWriteStallMeter metrics.Meter + commitL0ReadAmpWriteStallMeter metrics.Meter + commitWALRotationMeter metrics.Meter + commitWaitMeter metrics.Meter + + commitCount atomic.Int64 + commitTotalDuration atomic.Int64 + commitSemaphoreWait atomic.Int64 + commitMemTableWriteStall atomic.Int64 + commitL0ReadAmpWriteStall atomic.Int64 + commitWALRotation atomic.Int64 + commitWait atomic.Int64 + levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag @@ -135,7 +154,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) { // New returns a wrapped pebble DB object. The namespace is the prefix that the // metrics reporting should use for surfacing internal stats. -func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool) (*Database, error) { +func New(file string, cache int, handles int, namespace string, readonly bool, ephemeral bool, extraOptions *ExtraOptions) (*Database, error) { + if extraOptions == nil { + extraOptions = &ExtraOptions{} + } + if extraOptions.MemTableStopWritesThreshold <= 0 { + extraOptions.MemTableStopWritesThreshold = 2 + } + if extraOptions.MaxConcurrentCompactions == nil { + extraOptions.MaxConcurrentCompactions = func() int { return runtime.NumCPU() } + } + var levels []pebble.LevelOptions + if len(extraOptions.Levels) == 0 { + levels = []pebble.LevelOptions{ + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, + } + } else { + for _, level := range extraOptions.Levels { + levels = append(levels, pebble.LevelOptions{ + BlockSize: level.BlockSize, + IndexBlockSize: level.IndexBlockSize, + TargetFileSize: level.TargetFileSize, + FilterPolicy: bloom.FilterPolicy(10), + }) + } + } + // Ensure we have some minimal caching and file guarantees if cache < minCache { cache = minCache @@ -160,7 +210,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // Two memory tables is configured which is identical to leveldb, // including a frozen memory table and another live one. - memTableLimit := 2 + memTableLimit := extraOptions.MemTableStopWritesThreshold memTableSize := cache * 1024 * 1024 / 2 / memTableLimit // The memory table size is currently capped at maxMemTableSize-1 due to a @@ -198,19 +248,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e // The default compaction concurrency(1 thread), // Here use all available CPUs for faster compaction. - MaxConcurrentCompactions: func() int { return runtime.NumCPU() }, + MaxConcurrentCompactions: extraOptions.MaxConcurrentCompactions, - // Per-level options. Options for at least one level must be specified. The - // options for the last level are used for all subsequent levels. - Levels: []pebble.LevelOptions{ - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - {TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)}, - }, + // Per-level extraOptions. Options for at least one level must be specified. The + // extraOptions for the last level are used for all subsequent levels. + Levels: levels, ReadOnly: readonly, EventListener: &pebble.EventListener{ CompactionBegin: db.onCompactionBegin, @@ -219,11 +261,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e WriteStallEnd: db.onWriteStallEnd, }, Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble + + BytesPerSync: extraOptions.BytesPerSync, + L0CompactionFileThreshold: extraOptions.L0CompactionFileThreshold, + L0CompactionThreshold: extraOptions.L0CompactionThreshold, + L0StopWritesThreshold: extraOptions.L0StopWritesThreshold, + LBaseMaxBytes: extraOptions.LBaseMaxBytes, + DisableAutomaticCompactions: extraOptions.DisableAutomaticCompactions, + WALBytesPerSync: extraOptions.WALBytesPerSync, + WALDir: extraOptions.WALDir, + WALMinSyncInterval: extraOptions.WALMinSyncInterval, + TargetByteDeletionRate: extraOptions.TargetByteDeletionRate, } // Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130 // for more details. opt.Experimental.ReadSamplingMultiplier = -1 + if opt.Experimental.ReadSamplingMultiplier != 0 { + opt.Experimental.ReadSamplingMultiplier = extraOptions.Experimental.ReadSamplingMultiplier + } + opt.Experimental.L0CompactionConcurrency = extraOptions.Experimental.L0CompactionConcurrency + opt.Experimental.CompactionDebtConcurrency = extraOptions.Experimental.CompactionDebtConcurrency + opt.Experimental.ReadCompactionRate = extraOptions.Experimental.ReadCompactionRate + opt.Experimental.MaxWriterConcurrency = extraOptions.Experimental.MaxWriterConcurrency + opt.Experimental.ForceWriterParallelism = extraOptions.Experimental.ForceWriterParallelism + // Open the db and recover any potential corruptions innerDB, err := pebble.Open(file, opt) if err != nil { @@ -245,6 +307,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e db.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil) db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil) + db.compDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/debt", nil) + db.compInProgressGauge = metrics.GetOrRegisterGauge(namespace+"compact/inprogress", nil) + + db.commitCountMeter = metrics.GetOrRegisterMeter(namespace+"commit/counter", nil) + db.commitTotalDurationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/total", nil) + db.commitSemaphoreWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/semaphorewait", nil) + db.commitMemTableWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/memtablewritestall", nil) + db.commitL0ReadAmpWriteStallMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/l0readampwritestall", nil) + db.commitWALRotationMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/walrotation", nil) + db.commitWaitMeter = metrics.GetOrRegisterMeter(namespace+"commit/duration/commitwait", nil) + // Start up the metrics gathering and return go db.meter(metricsGatheringInterval, namespace) return db, nil @@ -457,6 +530,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { compReads [2]int64 nWrites [2]int64 + + commitCounts [2]int64 + commitTotalDurations [2]int64 + commitSemaphoreWaits [2]int64 + commitMemTableWriteStalls [2]int64 + commitL0ReadAmpWriteStalls [2]int64 + commitWALRotations [2]int64 + commitWaits [2]int64 ) // Iterate ad infinitum and collect the stats @@ -472,6 +553,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) { writeDelayTime = d.writeDelayTime.Load() nonLevel0CompCount = int64(d.nonLevel0Comp.Load()) level0CompCount = int64(d.level0Comp.Load()) + + commitCount = d.commitCount.Load() + commitTotalDuration = d.commitTotalDuration.Load() + commitSemaphoreWait = d.commitSemaphoreWait.Load() + commitMemTableWriteStall = d.commitMemTableWriteStall.Load() + commitL0ReadAmpWriteStall = d.commitL0ReadAmpWriteStall.Load() + commitWALRotation = d.commitWALRotation.Load() + commitWait = d.commitWait.Load() ) writeDelayTimes[i%2] = writeDelayTime writeDelayCounts[i%2] = writeDelayCount @@ -522,6 +611,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) { d.level0CompGauge.Update(level0CompCount) d.seekCompGauge.Update(stats.Compact.ReadCount) + commitCounts[i%2] = commitCount + commitTotalDurations[i%2] = commitTotalDuration + commitSemaphoreWaits[i%2] = commitSemaphoreWait + commitMemTableWriteStalls[i%2] = commitMemTableWriteStall + commitL0ReadAmpWriteStalls[i%2] = commitL0ReadAmpWriteStall + commitWALRotations[i%2] = commitWALRotation + commitWaits[i%2] = commitWait + + d.commitCountMeter.Mark(commitCounts[i%2] - commitCounts[(i-1)%2]) + d.commitTotalDurationMeter.Mark(commitTotalDurations[i%2] - commitTotalDurations[(i-1)%2]) + d.commitSemaphoreWaitMeter.Mark(commitSemaphoreWaits[i%2] - commitSemaphoreWaits[(i-1)%2]) + d.commitMemTableWriteStallMeter.Mark(commitMemTableWriteStalls[i%2] - commitMemTableWriteStalls[(i-1)%2]) + d.commitL0ReadAmpWriteStallMeter.Mark(commitL0ReadAmpWriteStalls[i%2] - commitL0ReadAmpWriteStalls[(i-1)%2]) + d.commitWALRotationMeter.Mark(commitWALRotations[i%2] - commitWALRotations[(i-1)%2]) + d.commitWaitMeter.Mark(commitWaits[i%2] - commitWaits[(i-1)%2]) + + d.compDebtGauge.Update(int64(stats.Compact.EstimatedDebt)) + d.compInProgressGauge.Update(stats.Compact.NumInProgress) + for i, level := range stats.Levels { // Append metrics for additional layers if i >= len(d.levelsGauge) { @@ -576,7 +684,20 @@ func (b *batch) Write() error { if b.db.closed { return pebble.ErrClosed } - return b.b.Commit(b.db.writeOptions) + err := b.b.Commit(b.db.writeOptions) + if err != nil { + return err + } + stats := b.b.CommitStats() + b.db.commitCount.Add(1) + b.db.commitTotalDuration.Add(int64(stats.TotalDuration)) + b.db.commitSemaphoreWait.Add(int64(stats.SemaphoreWaitDuration)) + b.db.commitMemTableWriteStall.Add(int64(stats.MemTableWriteStallDuration)) + b.db.commitL0ReadAmpWriteStall.Add(int64(stats.L0ReadAmpWriteStallDuration)) + b.db.commitWALRotation.Add(int64(stats.WALRotationDuration)) + b.db.commitWait.Add(int64(stats.CommitWaitDuration)) + // TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0) + return nil } // Reset resets the batch for reuse. diff --git a/internal/flags/categories.go b/internal/flags/categories.go index fe2e6d29d4..23d303e007 100644 --- a/internal/flags/categories.go +++ b/internal/flags/categories.go @@ -38,6 +38,8 @@ const ( MiscCategory = "MISC" TestingCategory = "TESTING" DeprecatedCategory = "ALIASED (deprecated)" + + PebbleCategory = "PEBBLE DB" ) func init() { diff --git a/node/node.go b/node/node.go index 4dc856c345..51fbce0c00 100644 --- a/node/node.go +++ b/node/node.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/pebble" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -714,6 +715,10 @@ func (n *Node) EventMux() *event.TypeMux { // previous can be found) from within the node's instance directory. If the node is // ephemeral, a memory database is returned. func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithExtraOptions(name, cache, handles, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithExtraOptions(name string, cache, handles int, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -726,12 +731,13 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) } @@ -747,6 +753,10 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) { + return n.OpenDatabaseWithFreezerWithExtraOptions(name, cache, handles, ancient, namespace, readonly, nil) +} + +func (n *Node) OpenDatabaseWithFreezerWithExtraOptions(name string, cache, handles int, ancient string, namespace string, readonly bool, pebbleExtraOptions *pebble.ExtraOptions) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -758,13 +768,14 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient db = rawdb.NewMemoryDatabase() } else { db, err = rawdb.Open(rawdb.OpenOptions{ - Type: n.config.DBEngine, - Directory: n.ResolvePath(name), - AncientsDirectory: n.ResolveAncient(name, ancient), - Namespace: namespace, - Cache: cache, - Handles: handles, - ReadOnly: readonly, + Type: n.config.DBEngine, + Directory: n.ResolvePath(name), + AncientsDirectory: n.ResolveAncient(name, ancient), + Namespace: namespace, + Cache: cache, + Handles: handles, + ReadOnly: readonly, + PebbleExtraOptions: pebbleExtraOptions, }) }