Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallel prefetching of nodes within the same storage trie #425

Draft
wants to merge 3 commits into
base: optimism
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ var (
Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)",
Category: flags.PerfCategory,
}
CachePrefetcherParallelismFlag = &cli.IntFlag{
Name: "cache.prefetcher.parallelism",
Usage: "Maximum number of concurrent disk reads trie prefetcher should perform at once",
Value: 16,
Category: flags.PerfCategory,
}
CachePreimagesFlag = &cli.BoolFlag{
Name: "cache.preimages",
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
Expand Down Expand Up @@ -2331,15 +2337,16 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
Fatalf("%v", err)
}
cache := &core.CacheConfig{
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
Preimages: ctx.Bool(CachePreimagesFlag.Name),
StateScheme: scheme,
StateHistory: ctx.Uint64(StateHistoryFlag.Name),
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
TriePrefetcherParallelism: ctx.Int(CachePrefetcherParallelismFlag.Name),
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
Preimages: ctx.Bool(CachePreimagesFlag.Name),
StateScheme: scheme,
StateHistory: ctx.Uint64(StateHistoryFlag.Name),
}
if cache.TrieDirtyDisabled && !cache.Preimages {
cache.Preimages = true
Expand Down
42 changes: 26 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,16 @@ const (
// CacheConfig contains the configuration values for the trie database
// and state snapshot these are resident in a blockchain.
type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand Down Expand Up @@ -170,12 +171,13 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TriePrefetcherParallelism: 16,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
}

// DefaultCacheConfigWithScheme returns a deep copied default cache config with
Expand Down Expand Up @@ -1795,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
return nil, it.index, err
}
}
statedb.StartPrefetcher("chain", witness)
statedb.StartPrefetcher("chain", witness, bc.cacheConfig.TriePrefetcherParallelism)
}
activeState = statedb

Expand Down Expand Up @@ -2541,3 +2543,11 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
return time.Duration(bc.flushInterval.Load())
}

// CacheConfig returns a reference to [bc.cacheConfig]
//
// This is used by [miner] to set prefetch parallelism
// during block building.
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness) {
func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness, maxConcurrency int) {
// Terminate any previously running prefetcher
s.StopPrefetcher()

Expand All @@ -220,7 +220,7 @@ func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness)
// To prevent this, the account trie is always scheduled for prefetching once
// the prefetcher is constructed. For more details, see:
// https://github.com/ethereum/go-ethereum/issues/29880
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil, maxConcurrency)
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
}
Expand Down
112 changes: 79 additions & 33 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type triePrefetcher struct {
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests

maxConcurrency int
workers workerGroup

deliveryMissMeter metrics.Meter

accountLoadReadMeter metrics.Meter
Expand All @@ -64,8 +67,10 @@ type triePrefetcher struct {
storageWasteMeter metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool, maxConcurrency int) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
workers := newWorkerGroup(maxConcurrency == 1)
workers.SetLimit(maxConcurrency)
return &triePrefetcher{
verkle: db.TrieDB().IsVerkle(),
db: db,
Expand All @@ -74,6 +79,9 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads
term: make(chan struct{}),
noreads: noreads,

maxConcurrency: maxConcurrency,
workers: newWorkerGroup(maxConcurrency == 1),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
Expand Down Expand Up @@ -172,7 +180,11 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
var err error
fetcher, err = newSubfetcher(p.db, p.root, owner, root, addr, p.workers)
if err != nil {
return err
}
p.fetchers[id] = fetcher
}
return fetcher.schedule(keys, read)
Expand Down Expand Up @@ -228,6 +240,10 @@ type subfetcher struct {
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes

workers workerGroup
pool *sync.Pool
waitGroup *sync.WaitGroup

tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

Expand All @@ -254,21 +270,40 @@ type subfetcherTask struct {

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, workers workerGroup) (*subfetcher, error) {
sf := &subfetcher{
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
workers: workers,
waitGroup: new(sync.WaitGroup),
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenRead: make(map[string]struct{}),
seenWrite: make(map[string]struct{}),
}
tr, err := sf.openTrie()
if err != nil {
return nil, err
}
sf.trie = tr
sf.pool = &sync.Pool{
New: func() any {
return sf.copyTrie()
},
}
go sf.loop()
return sf
return sf, nil
}

func (sf *subfetcher) copyTrie() Trie {
sf.lock.Lock()
defer sf.lock.Unlock()

return mustCopyTrie(sf.trie)
}

// schedule adds a batch of trie keys to the queue to prefetch.
Expand Down Expand Up @@ -300,17 +335,19 @@ func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
// wait blocks until the subfetcher terminates. This method is used to block on
// an async termination before accessing internal fields from the fetcher.
func (sf *subfetcher) wait() {
sf.waitGroup.Wait()
<-sf.term
}

// peek retrieves the fetcher's trie, populated with any pre-fetched data. The
// returned trie will be a shallow copy, so modifying it will break subsequent
// peeks for the original data. The method will block until all the scheduled
// data has been loaded and the fethcer terminated.
// data has been loaded and the subfetcher terminated.
func (sf *subfetcher) peek() Trie {
// Block until the fetcher terminates, then retrieve the trie
// Block until the subfetcher terminates, then retrieve the trie
sf.wait()
return sf.trie

return sf.copyTrie()
}

// terminate requests the subfetcher to stop accepting new tasks and spin down
Expand All @@ -329,35 +366,32 @@ func (sf *subfetcher) terminate(async bool) {
}

// openTrie resolves the target trie from database for prefetching.
func (sf *subfetcher) openTrie() error {
func (sf *subfetcher) openTrie() (Trie, error) {
// Open the verkle tree if the sub-fetcher is in verkle mode. Note, there is
// only a single fetcher for verkle.
if sf.db.TrieDB().IsVerkle() {
tr, err := sf.db.OpenTrie(sf.state)
if err != nil {
log.Warn("Trie prefetcher failed opening verkle trie", "root", sf.root, "err", err)
return err
return nil, err
}
sf.trie = tr
return nil
return tr, nil
}
// Open the merkle tree if the sub-fetcher is in merkle mode
if sf.owner == (common.Hash{}) {
tr, err := sf.db.OpenTrie(sf.state)
if err != nil {
log.Warn("Trie prefetcher failed opening account trie", "root", sf.root, "err", err)
return err
return nil, err
}
sf.trie = tr
return nil
return tr, nil
}
tr, err := sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root, nil)
if err != nil {
log.Warn("Trie prefetcher failed opening storage trie", "root", sf.root, "err", err)
return err
return nil, err
}
sf.trie = tr
return nil
return tr, nil
}

// loop loads newly-scheduled trie tasks as they are received and loads them, stopping
Expand All @@ -366,49 +400,61 @@ func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)

if err := sf.openTrie(); err != nil {
return
}
for {
select {
case <-sf.wake:
// Execute all remaining tasks in a single run
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.waitGroup.Add(len(tasks))
sf.lock.Unlock()

for _, task := range tasks {
key := string(task.key)
isDuplicate := false

sf.lock.Lock()
if task.read {
if _, ok := sf.seenRead[key]; ok {
sf.dupsRead++
continue
isDuplicate = true
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsCross++
continue
isDuplicate = true
}
sf.seenRead[key] = struct{}{}
} else {
if _, ok := sf.seenRead[key]; ok {
sf.dupsCross++
continue
isDuplicate = true
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsWrite++
continue
isDuplicate = true
}
}
if len(task.key) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task.key))
} else {
sf.trie.GetStorage(sf.addr, task.key)
}
if task.read {
sf.seenRead[key] = struct{}{}
} else {
sf.seenWrite[key] = struct{}{}
}
sf.lock.Unlock()

if isDuplicate {
sf.waitGroup.Done()
continue
}

sf.workers.Go(func() error {
defer sf.waitGroup.Done()

trie := sf.pool.Get().(Trie)
if len(task.key) == common.AddressLength {
trie.GetAccount(common.BytesToAddress(task.key))
} else {
trie.GetStorage(sf.addr, task.key)
}
sf.pool.Put(trie)
return nil
})
}

case <-sf.stop:
Expand Down
4 changes: 2 additions & 2 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func filledStateDB() *StateDB {

func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true)
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true, 4)
skey := common.HexToHash("aaa")

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestVerklePrefetcher(t *testing.T) {

state, _ = New(root, sdb)
sRoot := state.GetStorageRoot(addr)
fetcher := newTriePrefetcher(sdb, root, "", false)
fetcher := newTriePrefetcher(sdb, root, "", false, 4)

// Read account
fetcher.prefetch(common.Hash{}, root, common.Address{}, [][]byte{
Expand Down
2 changes: 1 addition & 1 deletion eth/api_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func generateWitness(blockchain *core.BlockChain, block *types.Block) (*stateles
return nil, fmt.Errorf("failed to retrieve parent state: %w", err)
}

statedb.StartPrefetcher("debug_execution_witness", witness)
statedb.StartPrefetcher("debug_execution_witness", witness, blockchain.CacheConfig().TriePrefetcherParallelism)
defer statedb.StopPrefetcher()

res, err := blockchain.Processor().Process(block, statedb, *blockchain.GetVMConfig())
Expand Down
Loading