Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix block zero; load genesis file and use it for getGenesisHash and block 0 timestamp. #69

Merged
merged 9 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func newCmd_rpc() *cli.Command {
EpochSearchConcurrency: epochSearchConcurrency,
})

defer func() {
if err := multi.Close(); err != nil {
klog.Errorf("error closing multi-epoch: %s", err.Error())
}
}()

for _, epoch := range epochs {
if err := multi.AddEpoch(epoch.Epoch(), epoch); err != nil {
return cli.Exit(fmt.Sprintf("failed to add epoch %d: %s", epoch.Epoch(), err.Error()), 1)
Expand Down
5 changes: 3 additions & 2 deletions cmd-x-index-all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -211,7 +212,7 @@ func createAllIndexes(
for {
_cid, sectionLength, block, err := rd.NextNode()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
return nil, err
Expand Down Expand Up @@ -611,7 +612,7 @@ func verifyAllIndexes(
for {
_cid, sectionLength, block, err := rd.NextNode()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
return err
Expand Down
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type Config struct {
URI URI `json:"uri" yaml:"uri"`
} `json:"sig_exists" yaml:"sig_exists"`
} `json:"indexes" yaml:"indexes"`
Genesis struct {
URI URI `json:"uri" yaml:"uri"`
} `json:"genesis" yaml:"genesis"`
}

func (c *Config) ConfigFilepath() string {
Expand Down Expand Up @@ -296,5 +299,20 @@ func (c *Config) Validate() error {
}
}
}
{
// if epoch is 0, then the genesis URI must be set:
if *c.Epoch == 0 {
if c.Genesis.URI.IsZero() {
return fmt.Errorf("epoch is 0, but genesis.uri is not set")
}
if !c.Genesis.URI.IsValid() {
return fmt.Errorf("genesis.uri is invalid")
}
// only support local genesis files for now:
if !c.Genesis.URI.IsLocal() {
return fmt.Errorf("genesis.uri must be a local file")
}
}
}
return nil
}
69 changes: 54 additions & 15 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/binary"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
carv1 "github.com/ipld/go-car"
"github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
"github.com/rpcpool/yellowstone-faithful/radiance/genesis"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)
Expand All @@ -30,20 +33,22 @@ type Epoch struct {
epoch uint64
isFilecoinMode bool // true if the epoch is in Filecoin mode (i.e. Lassie mode)
config *Config
// genesis:
genesis *GenesisContainer
// contains indexes and block data for the epoch
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
remoteCarHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
lassieFetcher *lassieWrapper
localCarReader *carv2.Reader
remoteCarReader ReaderAtCloser
carHeaderSize uint64
cidToOffsetIndex *compactindex.DB
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
sigExists *bucketteer.Reader
gsfaReader *gsfa.GsfaReader
cidToNodeCache *cache.Cache // TODO: prevent OOM
onClose []func() error
slotToCidCache *cache.Cache
cidToOffsetCache *cache.Cache
}

func (r *Epoch) getSlotToCidFromCache(slot uint64) (cid.Cid, error, bool) {
Expand Down Expand Up @@ -92,6 +97,10 @@ func (e *Epoch) Close() error {
return errors.Join(multiErr...)
}

func (e *Epoch) GetGenesis() *GenesisContainer {
return e.genesis
}

func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
if config == nil {
return nil, fmt.Errorf("config must not be nil")
Expand All @@ -105,6 +114,19 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
config: config,
onClose: make([]func() error, 0),
}
{
// if epoch is 0, then try loading the genesis from the config:
if *config.Epoch == 0 {
genesisConfig, ha, err := genesis.ReadGenesisFromFile(string(config.Genesis.URI))
if err != nil {
return nil, fmt.Errorf("failed to read genesis: %w", err)
}
ep.genesis = &GenesisContainer{
Hash: solana.HashFromBytes(ha[:]),
Config: genesisConfig,
}
}
}

if isCarMode {
// The CAR-mode requires a cid-to-offset index.
Expand Down Expand Up @@ -207,7 +229,7 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
ep.localCarReader = localCarReader
ep.remoteCarReader = remoteCarReader
if remoteCarReader != nil {
// read 10 bytes from the CAR file to get the header size
// determine the header size so that we know where the data starts:
headerSizeBuf, err := readSectionFromReaderAt(remoteCarReader, 0, 10)
if err != nil {
return nil, fmt.Errorf("failed to read CAR header: %w", err)
Expand All @@ -217,7 +239,24 @@ func NewEpochFromConfig(config *Config, c *cli.Context) (*Epoch, error) {
if n <= 0 {
return nil, fmt.Errorf("failed to decode CAR header size")
}
ep.remoteCarHeaderSize = uint64(n) + headerSize
ep.carHeaderSize = uint64(n) + headerSize
}
if localCarReader != nil {
// determine the header size so that we know where the data starts:
dr, err := localCarReader.DataReader()
if err != nil {
return nil, fmt.Errorf("failed to get local CAR data reader: %w", err)
}
header, err := readHeader(dr)
if err != nil {
return nil, fmt.Errorf("failed to read local CAR header: %w", err)
}
var buf bytes.Buffer
if err = carv1.WriteHeader(header, &buf); err != nil {
return nil, fmt.Errorf("failed to encode local CAR header: %w", err)
}
headerSize := uint64(buf.Len())
ep.carHeaderSize = headerSize
}
}
{
Expand Down
12 changes: 12 additions & 0 deletions genesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/gagliardetto/solana-go"
"github.com/rpcpool/yellowstone-faithful/radiance/genesis"
)

type GenesisContainer struct {
Hash solana.Hash
// The genesis config.
Config *genesis.Genesis
}
3 changes: 2 additions & 1 deletion gsfa/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manifest

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (m *Manifest) readAllContent() (Values, error) {
values := make([][2]uint64, 0, currentContentSize/16)
for {
_, err := io.ReadFull(sectionReader, buf)
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion index-cid-to-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func CreateIndex_cid2offset(ctx context.Context, tmpDir string, carPath string,
for {
c, sectionLength, err := rd.NextInfo()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
return "", err
Expand Down
82 changes: 54 additions & 28 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
{
prefetcherFromCar := func() error {
parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)
if slot == 0 {
parentIsInPreviousEpoch = true
}
if slot > 1 && block.Meta.Parent_slot == 0 {
parentIsInPreviousEpoch = true
}

var blockCid, parentCid cid.Cid
var blockCid, parentBlockCid cid.Cid
wg := new(errgroup.Group)
wg.Go(func() (err error) {
blockCid, err = epochHandler.FindCidFromSlot(ctx, slot)
Expand All @@ -84,7 +90,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if parentIsInPreviousEpoch {
return nil
}
parentCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
parentBlockCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
if err != nil {
return err
}
Expand All @@ -94,7 +100,17 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
if err != nil {
return err
}
klog.Infof("%s -> %s", parentCid, blockCid)
if slot == 0 {
klog.Infof("car start to slot(0)::%s", blockCid)
} else {
klog.Infof(
"slot(%d)::%s to slot(%d)::%s",
uint64(block.Meta.Parent_slot),
parentBlockCid,
slot,
blockCid,
)
}
{
var blockOffset, parentOffset uint64
wg := new(errgroup.Group)
Expand All @@ -108,13 +124,12 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
wg.Go(func() (err error) {
if parentIsInPreviousEpoch {
// get car file header size
parentOffset = epochHandler.remoteCarHeaderSize
parentOffset = epochHandler.carHeaderSize
return nil
}
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentCid)
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentBlockCid)
if err != nil {
// If the parent is not found, it (probably) means that it's outside of the car file.
parentOffset = epochHandler.remoteCarHeaderSize
return err
}
return nil
})
Expand All @@ -125,41 +140,27 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex

length := blockOffset - parentOffset
MiB := uint64(1024 * 1024)
maxSize := MiB * 100
if length > maxSize {
length = maxSize
maxPrefetchSize := MiB * 10 // let's cap prefetching size
if length > maxPrefetchSize {
length = maxPrefetchSize
}

idealEntrySize := uint64(36190)
var start uint64
if parentIsInPreviousEpoch {
start = parentOffset
} else {
if parentOffset > idealEntrySize {
start = parentOffset - idealEntrySize
} else {
start = parentOffset
}
length += idealEntrySize
}
start := parentOffset

klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset)
carSection, err := epochHandler.ReadAtFromCar(ctx, start, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)
if !parentIsInPreviousEpoch {
dr.Seek(int64(idealEntrySize), io.SeekStart)
}
br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
if err != nil {
return fmt.Errorf("failed to read first node: %w", err)
}
if !parentIsInPreviousEpoch && !gotCid.Equals(parentCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentCid, gotCid)
if !parentIsInPreviousEpoch && !gotCid.Equals(parentBlockCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentBlockCid, gotCid)
}
epochHandler.putNodeInCache(gotCid, data)

Expand Down Expand Up @@ -388,6 +389,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
allTransactions = append(allTransactions, txResp)
}
}

sort.Slice(allTransactions, func(i, j int) bool {
return allTransactions[i].Position < allTransactions[j].Position
})
Expand All @@ -399,6 +401,21 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
blockResp.ParentSlot = uint64(block.Meta.Parent_slot)
blockResp.Rewards = rewards

if slot == 0 {
genesis := epochHandler.GetGenesis()
if genesis != nil {
blockZeroBlocktime := uint64(genesis.Config.CreationTime.Unix())
blockResp.BlockTime = &blockZeroBlocktime
}
blockResp.ParentSlot = uint64(0)

zeroBlockHeight := uint64(0)
blockResp.BlockHeight = &zeroBlockHeight

blockZeroBlockHash := lastEntryHash.String()
blockResp.PreviousBlockhash = &blockZeroBlockHash // NOTE: this is what solana RPC does. Should it be nil instead? Or should it be the genesis hash?
}

{
blockHeight, ok := block.GetBlockHeight()
if ok {
Expand All @@ -408,7 +425,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
{
// get parent slot
parentSlot := uint64(block.Meta.Parent_slot)
if parentSlot != 0 && CalcEpochForSlot(parentSlot) == epochNumber {
if (parentSlot != 0 || slot == 1) && CalcEpochForSlot(parentSlot) == epochNumber {
// NOTE: if the parent is in the same epoch, we can get it from the same epoch handler as the block;
// otherwise, we need to get it from the previous epoch (TODO: implement this)
parentBlock, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot)
Expand Down Expand Up @@ -437,6 +454,15 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
}
tim.time("get parent block")

{
if len(blockResp.Transactions) == 0 {
blockResp.Transactions = make([]GetTransactionResponse, 0)
}
if blockResp.Rewards == nil || len(blockResp.Rewards.([]any)) == 0 {
blockResp.Rewards = make([]any, 0)
}
}

err = conn.Reply(
ctx,
req.ID,
Expand Down
Loading