Skip to content

Commit

Permalink
Fix prefetching
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Nov 16, 2023
1 parent 929a512 commit 4d41ba3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 44 deletions.
6 changes: 6 additions & 0 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func newCmd_rpc() *cli.Command {
EpochSearchConcurrency: epochSearchConcurrency,
})

defer func() {
if err := multi.Close(); err != nil {
klog.Errorf("error closing multi-epoch: %s", err.Error())
}
}()

for _, epoch := range epochs {
if err := multi.AddEpoch(epoch.Epoch(), epoch); err != nil {
return cli.Exit(fmt.Sprintf("failed to add epoch %d: %s", epoch.Epoch(), err.Error()), 1)
Expand Down
49 changes: 34 additions & 15 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/binary"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
carv1 "github.com/ipld/go-car"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -34,19 +36,19 @@ type Epoch struct {
// genesis:
genesis *GenesisContainer
// contains indexes and block data for the epoch
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
remoteCarHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
carHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
}

func (r *Epoch) getSlotToCidFromCache(slot uint64) (cid.Cid, error, bool) {
Expand Down Expand Up @@ -227,7 +229,7 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
ep.localCarReader = localCarReader
ep.remoteCarReader = remoteCarReader
if remoteCarReader != nil {
// read 10 bytes from the CAR file to get the header size
// determine the header size so that we know where the data starts:
headerSizeBuf, err := readSectionFromReaderAt(remoteCarReader, 0, 10)
if err != nil {
return nil, fmt.Errorf("failed to read CAR header: %w", err)
Expand All @@ -237,7 +239,24 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
if n <= 0 {
return nil, fmt.Errorf("failed to decode CAR header size")
}
ep.remoteCarHeaderSize = uint64(n) + headerSize
ep.carHeaderSize = uint64(n) + headerSize
}
if localCarReader != nil {
// determine the header size so that we know where the data starts:
dr, err := localCarReader.DataReader()
if err != nil {
return nil, fmt.Errorf("failed to get local CAR data reader: %w", err)
}
header, err := readHeader(dr)
if err != nil {
return nil, fmt.Errorf("failed to read local CAR header: %w", err)
}
var buf bytes.Buffer
if err = carv1.WriteHeader(header, &buf); err != nil {
return nil, fmt.Errorf("failed to encode local CAR header: %w", err)
}
headerSize := uint64(buf.Len())
ep.carHeaderSize = headerSize
}
}
{
Expand Down
56 changes: 27 additions & 29 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
tim.time("GetBlock")
{
prefetcherFromCar := func() error {
parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)
if slot == 0 {
return nil
parentIsInPreviousEpoch = true
}
if slot > 1 && block.Meta.Parent_slot == 0 {
parentIsInPreviousEpoch = true
}
parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)

var blockCid, parentCid cid.Cid
var blockCid, parentBlockCid cid.Cid
wg := new(errgroup.Group)
wg.Go(func() (err error) {
blockCid, err = epochHandler.FindCidFromSlot(ctx, slot)
Expand All @@ -87,7 +90,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if parentIsInPreviousEpoch {
return nil
}
parentCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
parentBlockCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
if err != nil {
return err
}
Expand All @@ -97,7 +100,17 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if err != nil {
return err
}
klog.Infof("%s -> %s", parentCid, blockCid)
if slot == 0 {
klog.Infof("car start to slot(0)::%s", blockCid)
} else {
klog.Infof(
"slot(%d)::%s to slot(%d)::%s",
uint64(block.Meta.Parent_slot),
parentBlockCid,
slot,
blockCid,
)
}
{
var blockOffset, parentOffset uint64
wg := new(errgroup.Group)
Expand All @@ -111,13 +124,12 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
wg.Go(func() (err error) {
if parentIsInPreviousEpoch {
// get car file header size
parentOffset = epochHandler.remoteCarHeaderSize
parentOffset = epochHandler.carHeaderSize
return nil
}
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentCid)
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentBlockCid)
if err != nil {
// If the parent is not found, it (probably) means that it's outside of the car file.
parentOffset = epochHandler.remoteCarHeaderSize
return err
}
return nil
})
Expand All @@ -128,41 +140,27 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex

length := blockOffset - parentOffset
MiB := uint64(1024 * 1024)
maxSize := MiB * 100
if length > maxSize {
length = maxSize
maxPrefetchSize := MiB * 10 // let's cap prefetching size
if length > maxPrefetchSize {
length = maxPrefetchSize
}

idealEntrySize := uint64(36190)
var start uint64
if parentIsInPreviousEpoch {
start = parentOffset
} else {
if parentOffset > idealEntrySize {
start = parentOffset - idealEntrySize
} else {
start = parentOffset
}
length += idealEntrySize
}
start := parentOffset

klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, start, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)
if !parentIsInPreviousEpoch {
dr.Seek(int64(idealEntrySize), io.SeekStart)
}
br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
if err != nil {
return fmt.Errorf("failed to read first node: %w", err)
}
if !parentIsInPreviousEpoch && !gotCid.Equals(parentCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentCid, gotCid)
if !parentIsInPreviousEpoch && !gotCid.Equals(parentBlockCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentBlockCid, gotCid)
}
epochHandler.putNodeInCache(gotCid, data)

Expand Down
10 changes: 10 additions & 0 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ func (m *MultiEpoch) GetFirstAvailableEpochNumber() (uint64, error) {
return 0, fmt.Errorf("no epochs available")
}

func (m *MultiEpoch) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
klog.Info("Closing all epochs...")
for _, ep := range m.epochs {
ep.Close()
}
return nil
}

type ListenerConfig struct {
ProxyConfig *ProxyConfig
}
Expand Down

0 comments on commit 4d41ba3

Please sign in to comment.