Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dedup find missed accessors logic #13344

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 15 additions & 43 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,57 +1215,32 @@ func (d *Domain) buildAccessor(ctx context.Context, fromStep, toStep uint64, dat
}

func (d *Domain) missedBtreeAccessors() (l []*filesItem) {
d.dirtyFiles.Walk(func(items []*filesItem) bool { // don't run slow logic while iterating on btree
for _, item := range items {
fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep
fPath := d.kvBtFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
panic(err)
}
if !exists {
l = append(l, item)
continue
}
fPath = d.kvExistenceIdxFilePath(fromStep, toStep)
exists, err = dir.FileExist(fPath)
if err != nil {
panic(err)
}
if !exists {
l = append(l, item)
continue
}
return fileItemsWithMissingAccessors(d.dirtyFiles, d.aggregationStep, func(fromStep uint64, toStep uint64) []string {
var accessors []string
if d.IndexList.Has(AccessorBTree) {
accessors = append(accessors, d.kvBtFilePath(fromStep, toStep))
}
return true
if d.IndexList.Has(AccessorExistence) {
accessors = append(accessors, d.kvExistenceIdxFilePath(fromStep, toStep))
}

return accessors
})
return l
}

func (d *Domain) missedAccessors() (l []*filesItem) {
d.dirtyFiles.Walk(func(items []*filesItem) bool { // don't run slow logic while iterating on btree
for _, item := range items {
fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep
fPath := d.kvAccessorFilePath(fromStep, toStep)
exists, err := dir.FileExist(fPath)
if err != nil {
panic(err)
}
if !exists {
l = append(l, item)
}
}
return true
if !d.IndexList.Has(AccessorHashMap) {
return nil
}
return fileItemsWithMissingAccessors(d.dirtyFiles, d.aggregationStep, func(fromStep uint64, toStep uint64) []string {
return []string{d.kvAccessorFilePath(fromStep, toStep)}
})
return l
}

// BuildMissedAccessors - produce .efi/.vi/.kvi from .ef/.v/.kv
func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps *background.ProgressSet) {
d.History.BuildMissedAccessors(ctx, g, ps)
for _, item := range d.missedBtreeAccessors() {
if d.IndexList&AccessorBTree == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so yeah, that means that we do not create btree index if flag bit is 0. Index creation is optional

continue
}
if item.decompressor == nil {
log.Warn(fmt.Sprintf("[dbg] BuildMissedAccessors: item with nil decompressor %s %d-%d", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep))
}
Expand All @@ -1281,9 +1256,6 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps
})
}
for _, item := range d.missedAccessors() {
if d.IndexList&AccessorHashMap == 0 {
continue
}
if item.decompressor == nil {
log.Warn(fmt.Sprintf("[dbg] BuildMissedAccessors: item with nil decompressor %s %d-%d", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep))
}
Expand Down
21 changes: 21 additions & 0 deletions erigon-lib/state/files_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

btree2 "github.com/tidwall/btree"

"github.com/erigontech/erigon-lib/common/dir"
"github.com/erigontech/erigon-lib/config3"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/recsplit"
Expand Down Expand Up @@ -354,3 +355,23 @@ func (files visibleFiles) MergedRanges() []MergeRange {
}
return res
}

func fileItemsWithMissingAccessors(dirtyFiles *btree2.BTreeG[*filesItem], aggregationStep uint64, accessorsFor func(fromStep, toStep uint64) []string) (l []*filesItem) {
dirtyFiles.Walk(func(items []*filesItem) bool {
for _, item := range items {
fromStep, toStep := item.startTxNum/aggregationStep, item.endTxNum/aggregationStep
for _, fName := range accessorsFor(fromStep, toStep) {
exists, err := dir.FileExist(fName)
if err != nil {
panic(err)
}
if !exists {
l = append(l, item)
break
}
}
}
return true
})
return
}
57 changes: 57 additions & 0 deletions erigon-lib/state/files_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package state

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
btree2 "github.com/tidwall/btree"
)

func TestFileItemWithMissingAccessor(t *testing.T) {
tmp := t.TempDir()

// filesItem
f1 := &filesItem{
startTxNum: 1,
endTxNum: 10,
}
f2 := &filesItem{
startTxNum: 11,
endTxNum: 20,
}
f3 := &filesItem{
startTxNum: 31,
endTxNum: 40,
}
aggStep := uint64(10)

btree := btree2.NewBTreeGOptions(filesItemLess, btree2.Options{Degree: 128, NoLocks: false})
btree.Set(f1)
btree.Set(f2)
btree.Set(f3)

accessorFor := func(fromStep, toStep uint64) []string {
return []string{
filepath.Join(tmp, fmt.Sprintf("testacc_%d_%d.bin", fromStep, toStep)),
filepath.Join(tmp, fmt.Sprintf("testacc2_%d_%d.bin", fromStep, toStep)),
}
}

// create accesssor files for f1, f2
for _, fname := range accessorFor(f1.startTxNum/aggStep, f1.endTxNum/aggStep) {
os.WriteFile(fname, []byte("test"), 0644)
defer os.Remove(fname)
}

for _, fname := range accessorFor(f2.startTxNum/aggStep, f2.endTxNum/aggStep) {
os.WriteFile(fname, []byte("test"), 0644)
defer os.Remove(fname)
}

fileItems := fileItemsWithMissingAccessors(btree, aggStep, accessorFor)
require.Equal(t, 1, len(fileItems))
require.Equal(t, f3, fileItems[0])
}
22 changes: 7 additions & 15 deletions erigon-lib/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,21 +315,14 @@ func (ht *HistoryRoTx) Files() (res []string) {
}

func (h *History) missedAccessors() (l []*filesItem) {
h.dirtyFiles.Walk(func(items []*filesItem) bool { // don't run slow logic while iterating on btree
for _, item := range items {
fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep
exists, err := dir.FileExist(h.vAccessorFilePath(fromStep, toStep))
if err != nil {
_, fName := filepath.Split(h.vAccessorFilePath(fromStep, toStep))
h.logger.Warn("[agg] History.missedAccessors", "err", err, "f", fName)
}
if !exists {
l = append(l, item)
}
if !h.indexList.Has(AccessorHashMap) {
return nil
}
return fileItemsWithMissingAccessors(h.dirtyFiles, h.aggregationStep, func(fromStep, toStep uint64) []string {
return []string{
h.vAccessorFilePath(fromStep, toStep),
}
return true
})
return l
}

func (h *History) buildVi(ctx context.Context, item *filesItem, ps *background.ProgressSet) (err error) {
Expand Down Expand Up @@ -438,8 +431,7 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi

func (h *History) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps *background.ProgressSet) {
h.InvertedIndex.BuildMissedAccessors(ctx, g, ps)
missedFiles := h.missedAccessors()
for _, item := range missedFiles {
for _, item := range h.missedAccessors() {
item := item
g.Go(func() error {
return h.buildVi(ctx, item, ps)
Expand Down
19 changes: 6 additions & 13 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,14 @@ func (ii *InvertedIndex) reCalcVisibleFiles(toTxNum uint64) {
}

func (ii *InvertedIndex) missedAccessors() (l []*filesItem) {
ii.dirtyFiles.Walk(func(items []*filesItem) bool {
for _, item := range items {
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
exists, err := dir.FileExist(ii.efAccessorFilePath(fromStep, toStep))
if err != nil {
_, fName := filepath.Split(ii.efAccessorFilePath(fromStep, toStep))
ii.logger.Warn("[agg] InvertedIndex missedAccessors", "err", err, "f", fName)
}
if !exists {
l = append(l, item)
}
if !ii.indexList.Has(AccessorHashMap) {
return nil
}
return fileItemsWithMissingAccessors(ii.dirtyFiles, ii.aggregationStep, func(fromStep, toStep uint64) []string {
return []string{
ii.efAccessorFilePath(fromStep, toStep),
}
return true
})
return l
}

func (ii *InvertedIndex) buildEfAccessor(ctx context.Context, item *filesItem, ps *background.ProgressSet) (err error) {
Expand Down
Loading