diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fd547ebce..20de26f8c 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -20,6 +20,7 @@ package fetcher import ( "errors" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -32,10 +33,11 @@ import ( ) const ( - lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested - arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested - gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches - fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping ) const ( @@ -183,6 +185,10 @@ type BlockFetcher struct { queues map[string]int // Per peer block counts to prevent memory exhaustion queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use + missingParent map[common.Hash][]common.Hash // Mapping from parent hash to slice of block hashes of missing parent blocks + importMissingParent chan common.Hash + // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain getBlock blockRetrievalFn // Retrieves a block from the local chain @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr ) *BlockFetcher { return &BlockFetcher{ - light: light, - notify: make(chan *blockAnnounce), - inject: make(chan *blockOrHeaderInject), - headerFilter: make(chan chan *headerFilterTask), - bodyFilter: make(chan chan *bodyFilterTask), - done: make(chan common.Hash), - quit: make(chan struct{}), - announces: make(map[string]int), - announced: make(map[common.Hash][]*blockAnnounce), - fetching: make(map[common.Hash]*blockAnnounce), - fetched: make(map[common.Hash][]*blockAnnounce), - completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), - queues: make(map[string]int), - queued: make(map[common.Hash]*blockOrHeaderInject), - getHeader: getHeader, - getBlock: getBlock, - verifyHeader: verifyHeader, - verifyBlobHeader: verifyBlobHeader, - broadcastBlock: broadcastBlock, - chainHeight: chainHeight, - insertHeaders: insertHeaders, - insertChain: insertChain, - dropPeer: dropPeer, + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New(nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + missingParent: make(map[common.Hash][]common.Hash), + importMissingParent: make(chan common.Hash, blockLimit), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + verifyBlobHeader: verifyBlobHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac func (f *BlockFetcher) loop() { // Iterate the block fetching until a quit is requested var ( - fetchTimer = time.NewTimer(0) - completeTimer = time.NewTimer(0) + fetchTimer = time.NewTimer(0) + completeTimer = time.NewTimer(0) + cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval) ) <-fetchTimer.C // clear out the channel <-completeTimer.C defer fetchTimer.Stop() defer completeTimer.Stop() + defer cleanMissingParentTicker.Stop() for { // Clean up any expired block fetches @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() { } // Otherwise if fresh and still unknown, try and import if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() continue } if f.light { @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() { case hash := <-f.done: // A pending import finished, remove all traces of the notification f.forgetHash(hash) + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval @@ -684,6 +698,28 @@ func (f *BlockFetcher) loop() { f.enqueue(announce.origin, nil, block, sidecars) } } + + case hash := <-f.importMissingParent: + if op := f.queued[hash]; op != nil { + if f.light { + f.importHeaders(op.origin, op.header) + } else { + f.importBlocks(op.origin, op.block, op.sidecars) + } + } + case <-cleanMissingParentTicker.C: + height := f.chainHeight() + f.missingParentLock.Lock() + for _, blocks := range f.missingParent { + for _, block := range blocks { + if op := f.queued[block]; op != nil { + if op.number()+maxUncleDist < height { + f.forgetBlock(block) + } + } + } + } + f.missingParentLock.Unlock() } } } @@ -780,13 +816,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) go func() { - defer func() { f.done <- hash }() - // If the parent's unknown, abort insertion + // If the parent's unknown, queue for later processing when parent block is imported parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + f.missingParentLock.Lock() + f.missingParent[header.ParentHash] = append(f.missingParent[header.ParentHash], hash) + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Validate the header and if something went wrong, drop the peer if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) @@ -798,6 +837,14 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) return } + f.missingParentLock.Lock() + nextBlockHashes, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + for _, nextBlockHash := range nextBlockHashes { + f.importMissingParent <- nextBlockHash + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(header, nil) @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() - - // If the parent's unknown, abort insertion - parent := f.getBlock(block.ParentHash()) + // If the parent's unknown, queue for later processing when parent block is imported + parentHash := block.ParentHash() + parent := f.getBlock(parentHash) if parent == nil { - log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash) + f.missingParentLock.Lock() + f.missingParent[parentHash] = append(f.missingParent[parentHash], hash) + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Quickly validate the header and propagate the block if it passes err := f.verifyHeader(block.Header()) if err == nil { @@ -853,6 +903,14 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, nil, false) + f.missingParentLock.Lock() + nextBlockHashes, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + for _, nextBlockHash := range nextBlockHashes { + f.importMissingParent <- nextBlockHash + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(nil, block) @@ -906,6 +964,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. +// The caller must hold the missingParentLock. func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]-- @@ -913,5 +972,25 @@ func (f *BlockFetcher) forgetBlock(hash common.Hash) { delete(f.queues, insert.origin) } delete(f.queued, hash) + var parentHash common.Hash + if f.light { + parentHash = insert.header.ParentHash + } else { + parentHash = insert.block.ParentHash() + } + blocks := f.missingParent[parentHash] + for i, block := range blocks { + if block == hash { + // Swap with the last element then decrease the length + blocks[i] = blocks[len(blocks)-1] + blocks = blocks[:len(blocks)-1] + break + } + } + if len(blocks) == 0 { + delete(f.missingParent, parentHash) + } else { + f.missingParent[parentHash] = blocks + } } }