diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 026bcf7e62..0d2ccb1d1b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -495,6 +495,12 @@ var ( Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)", Category: flags.PerfCategory, } + CachePrefetcherParallelismFlag = &cli.IntFlag{ + Name: "cache.prefetcher.parallelism", + Usage: "Maximum number of concurrent disk reads trie prefetcher should perform at once", + Value: 16, + Category: flags.PerfCategory, + } CachePreimagesFlag = &cli.BoolFlag{ Name: "cache.preimages", Usage: "Enable recording the SHA3/keccak preimages of trie keys", @@ -2331,15 +2337,16 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh Fatalf("%v", err) } cache := &core.CacheConfig{ - TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, - TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), - TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, - TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", - TrieTimeLimit: ethconfig.Defaults.TrieTimeout, - SnapshotLimit: ethconfig.Defaults.SnapshotCache, - Preimages: ctx.Bool(CachePreimagesFlag.Name), - StateScheme: scheme, - StateHistory: ctx.Uint64(StateHistoryFlag.Name), + TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, + TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), + TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, + TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", + TrieTimeLimit: ethconfig.Defaults.TrieTimeout, + TriePrefetcherParallelism: ctx.Int(CachePrefetcherParallelismFlag.Name), + SnapshotLimit: ethconfig.Defaults.SnapshotCache, + Preimages: ctx.Bool(CachePreimagesFlag.Name), + StateScheme: scheme, + StateHistory: ctx.Uint64(StateHistoryFlag.Name), } if cache.TrieDirtyDisabled && !cache.Preimages { cache.Preimages = true diff --git a/core/blockchain.go b/core/blockchain.go index bbf57c3ccb..019e187ea6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -132,15 +132,16 @@ const ( // CacheConfig contains the configuration values for the trie database // and state snapshot these are resident in a blockchain. type CacheConfig struct { - TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory - TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks - TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk - TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk - SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory - Preimages bool // Whether to store preimage of trie key to the disk - StateHistory uint64 // Number of blocks from head whose state histories are reserved. - StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) + TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory + Preimages bool // Whether to store preimage of trie key to the disk + StateHistory uint64 // Number of blocks from head whose state histories are reserved. + StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it @@ -170,12 +171,13 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config { // defaultCacheConfig are the default caching values if none are specified by the // user (also used during testing). var defaultCacheConfig = &CacheConfig{ - TrieCleanLimit: 256, - TrieDirtyLimit: 256, - TrieTimeLimit: 5 * time.Minute, - SnapshotLimit: 256, - SnapshotWait: true, - StateScheme: rawdb.HashScheme, + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TriePrefetcherParallelism: 16, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 256, + SnapshotWait: true, + StateScheme: rawdb.HashScheme, } // DefaultCacheConfigWithScheme returns a deep copied default cache config with @@ -1795,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness return nil, it.index, err } } - statedb.StartPrefetcher("chain", witness) + statedb.StartPrefetcher("chain", witness, bc.cacheConfig.TriePrefetcherParallelism) } activeState = statedb @@ -2541,3 +2543,11 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) GetTrieFlushInterval() time.Duration { return time.Duration(bc.flushInterval.Load()) } + +// CacheConfig returns a reference to [bc.cacheConfig] +// +// This is used by [miner] to set prefetch parallelism +// during block building. +func (bc *BlockChain) CacheConfig() *CacheConfig { + return bc.cacheConfig +} diff --git a/core/state/statedb.go b/core/state/statedb.go index 566344a4c7..38c39075dd 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -204,7 +204,7 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) { // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness) { +func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness, maxConcurrency int) { // Terminate any previously running prefetcher s.StopPrefetcher() @@ -220,7 +220,7 @@ func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness) // To prevent this, the account trie is always scheduled for prefetching once // the prefetcher is constructed. For more details, see: // https://github.com/ethereum/go-ethereum/issues/29880 - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil, maxConcurrency) if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil { log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err) } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 29dfdf04fa..904d323cf9 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -47,6 +47,9 @@ type triePrefetcher struct { term chan struct{} // Channel to signal interruption noreads bool // Whether to ignore state-read-only prefetch requests + maxConcurrency int + workers workerGroup + deliveryMissMeter metrics.Meter accountLoadReadMeter metrics.Meter @@ -64,8 +67,10 @@ type triePrefetcher struct { storageWasteMeter metrics.Meter } -func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher { +func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool, maxConcurrency int) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace + workers := newWorkerGroup(maxConcurrency == 1) + workers.SetLimit(maxConcurrency) return &triePrefetcher{ verkle: db.TrieDB().IsVerkle(), db: db, @@ -74,6 +79,9 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads term: make(chan struct{}), noreads: noreads, + maxConcurrency: maxConcurrency, + workers: newWorkerGroup(maxConcurrency == 1), + deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil), @@ -172,7 +180,11 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm id := p.trieID(owner, root) fetcher := p.fetchers[id] if fetcher == nil { - fetcher = newSubfetcher(p.db, p.root, owner, root, addr) + var err error + fetcher, err = newSubfetcher(p.db, p.root, owner, root, addr, p.workers) + if err != nil { + return err + } p.fetchers[id] = fetcher } return fetcher.schedule(keys, read) @@ -228,6 +240,10 @@ type subfetcher struct { addr common.Address // Address of the account that the trie belongs to trie Trie // Trie being populated with nodes + workers workerGroup + pool *sync.Pool + waitGroup *sync.WaitGroup + tasks []*subfetcherTask // Items queued up for retrieval lock sync.Mutex // Lock protecting the task queue @@ -254,21 +270,40 @@ type subfetcherTask struct { // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { +func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, workers workerGroup) (*subfetcher, error) { sf := &subfetcher{ db: db, state: state, owner: owner, root: root, addr: addr, + workers: workers, + waitGroup: new(sync.WaitGroup), wake: make(chan struct{}, 1), stop: make(chan struct{}), term: make(chan struct{}), seenRead: make(map[string]struct{}), seenWrite: make(map[string]struct{}), } + tr, err := sf.openTrie() + if err != nil { + return nil, err + } + sf.trie = tr + sf.pool = &sync.Pool{ + New: func() any { + return sf.copyTrie() + }, + } go sf.loop() - return sf + return sf, nil +} + +func (sf *subfetcher) copyTrie() Trie { + sf.lock.Lock() + defer sf.lock.Unlock() + + return mustCopyTrie(sf.trie) } // schedule adds a batch of trie keys to the queue to prefetch. @@ -300,17 +335,19 @@ func (sf *subfetcher) schedule(keys [][]byte, read bool) error { // wait blocks until the subfetcher terminates. This method is used to block on // an async termination before accessing internal fields from the fetcher. func (sf *subfetcher) wait() { + sf.waitGroup.Wait() <-sf.term } // peek retrieves the fetcher's trie, populated with any pre-fetched data. The // returned trie will be a shallow copy, so modifying it will break subsequent // peeks for the original data. The method will block until all the scheduled -// data has been loaded and the fethcer terminated. +// data has been loaded and the subfetcher terminated. func (sf *subfetcher) peek() Trie { - // Block until the fetcher terminates, then retrieve the trie + // Block until the subfetcher terminates, then retrieve the trie sf.wait() - return sf.trie + + return sf.copyTrie() } // terminate requests the subfetcher to stop accepting new tasks and spin down @@ -329,35 +366,32 @@ func (sf *subfetcher) terminate(async bool) { } // openTrie resolves the target trie from database for prefetching. -func (sf *subfetcher) openTrie() error { +func (sf *subfetcher) openTrie() (Trie, error) { // Open the verkle tree if the sub-fetcher is in verkle mode. Note, there is // only a single fetcher for verkle. if sf.db.TrieDB().IsVerkle() { tr, err := sf.db.OpenTrie(sf.state) if err != nil { log.Warn("Trie prefetcher failed opening verkle trie", "root", sf.root, "err", err) - return err + return nil, err } - sf.trie = tr - return nil + return tr, nil } // Open the merkle tree if the sub-fetcher is in merkle mode if sf.owner == (common.Hash{}) { tr, err := sf.db.OpenTrie(sf.state) if err != nil { log.Warn("Trie prefetcher failed opening account trie", "root", sf.root, "err", err) - return err + return nil, err } - sf.trie = tr - return nil + return tr, nil } tr, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil) if err != nil { log.Warn("Trie prefetcher failed opening storage trie", "root", sf.root, "err", err) - return err + return nil, err } - sf.trie = tr - return nil + return tr, nil } // loop loads newly-scheduled trie tasks as they are received and loads them, stopping @@ -366,9 +400,6 @@ func (sf *subfetcher) loop() { // No matter how the loop stops, signal anyone waiting that it's terminated defer close(sf.term) - if err := sf.openTrie(); err != nil { - return - } for { select { case <-sf.wake: @@ -376,39 +407,54 @@ func (sf *subfetcher) loop() { sf.lock.Lock() tasks := sf.tasks sf.tasks = nil + sf.waitGroup.Add(len(tasks)) sf.lock.Unlock() for _, task := range tasks { key := string(task.key) + isDuplicate := false + + sf.lock.Lock() if task.read { if _, ok := sf.seenRead[key]; ok { sf.dupsRead++ - continue + isDuplicate = true } if _, ok := sf.seenWrite[key]; ok { sf.dupsCross++ - continue + isDuplicate = true } + sf.seenRead[key] = struct{}{} } else { if _, ok := sf.seenRead[key]; ok { sf.dupsCross++ - continue + isDuplicate = true } if _, ok := sf.seenWrite[key]; ok { sf.dupsWrite++ - continue + isDuplicate = true } - } - if len(task.key) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task.key)) - } else { - sf.trie.GetStorage(sf.addr, task.key) - } - if task.read { - sf.seenRead[key] = struct{}{} - } else { sf.seenWrite[key] = struct{}{} } + sf.lock.Unlock() + + if isDuplicate { + sf.waitGroup.Done() + continue + } + + sf.workers.Go(func() error { + defer sf.waitGroup.Done() + + trie := sf.pool.Get().(Trie) + if len(task.key) == common.AddressLength { + trie.GetAccount(common.BytesToAddress(task.key)) + } else { + trie.GetStorage(sf.addr, task.key) + } + sf.pool.Put(trie) + return nil + }) } case <-sf.stop: diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 529b42d39c..781b3b731b 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -50,7 +50,7 @@ func filledStateDB() *StateDB { func TestUseAfterTerminate(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true) + prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true, 4) skey := common.HexToHash("aaa") if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil { @@ -87,7 +87,7 @@ func TestVerklePrefetcher(t *testing.T) { state, _ = New(root, sdb) sRoot := state.GetStorageRoot(addr) - fetcher := newTriePrefetcher(sdb, root, "", false) + fetcher := newTriePrefetcher(sdb, root, "", false, 4) // Read account fetcher.prefetch(common.Hash{}, root, common.Address{}, [][]byte{ diff --git a/eth/api_debug.go b/eth/api_debug.go index ab8dc7420f..d4aea44cce 100644 --- a/eth/api_debug.go +++ b/eth/api_debug.go @@ -474,7 +474,7 @@ func generateWitness(blockchain *core.BlockChain, block *types.Block) (*stateles return nil, fmt.Errorf("failed to retrieve parent state: %w", err) } - statedb.StartPrefetcher("debug_execution_witness", witness) + statedb.StartPrefetcher("debug_execution_witness", witness, blockchain.CacheConfig().TriePrefetcherParallelism) defer statedb.StopPrefetcher() res, err := blockchain.Processor().Process(block, statedb, *blockchain.GetVMConfig()) diff --git a/miner/worker.go b/miner/worker.go index 864c5ae2ff..09bfd2c6d2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -331,7 +331,7 @@ func (miner *Miner) makeEnv(parent *types.Header, header *types.Header, coinbase if err != nil { return nil, err } - state.StartPrefetcher("miner", bundle) + state.StartPrefetcher("miner", bundle, miner.backend.BlockChain().CacheConfig().TriePrefetcherParallelism) } // Note the passed coinbase may be different with header.Coinbase. return &environment{