Skip to content

Commit

Permalink
add Silkworm poller for Antelope EVM (#108)
Browse files Browse the repository at this point in the history
* add Silkworm poller for Antelope EVM

* remove config.yaml
  • Loading branch information
fschoell authored Dec 14, 2023
1 parent 1647451 commit 1de921f
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 3 deletions.
123 changes: 123 additions & 0 deletions blockfetcher/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package blockfetcher

import (
"context"
"fmt"
"sync"
"time"

"github.com/abourget/llerrgroup"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type ToEthBlock func(in *rpc.Block, receipts map[string]*rpc.TransactionReceipt) (*pbeth.Block, map[string]bool)

type BlockFetcher struct {
rpcClient *rpc.Client
latest uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
toEthBlock ToEthBlock
lastFetchAt time.Time
logger *zap.Logger
}

func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher {
return &BlockFetcher{
rpcClient: rpcClient,
latestBlockRetryInterval: latestBlockRetryInterval,
toEthBlock: toEthBlock,
fetchInterval: intervalBetweenFetch,
logger: logger,
}
}

func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
for f.latest < blockNum {
f.latest, err = f.rpcClient.LatestBlockNum(ctx)
if err != nil {
return nil, fmt.Errorf("fetching latest block num: %w", err)
}

f.logger.Info("got latest block", zap.Uint64("latest", f.latest), zap.Uint64("block_num", blockNum))

if f.latest < blockNum {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}

sinceLastFetch := time.Since(f.lastFetchAt)
if sinceLastFetch < f.fetchInterval {
time.Sleep(f.fetchInterval - sinceLastFetch)
}

rpcBlock, err := f.rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction())
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
}

receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient)
if err != nil {
return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

f.logger.Debug("fetched receipts", zap.Int("count", len(receipts)))

f.lastFetchAt = time.Now()

if err != nil {
return nil, fmt.Errorf("fetching logs for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

ethBlock, _ := f.toEthBlock(rpcBlock, receipts)
anyBlock, err := anypb.New(ethBlock)
if err != nil {
return nil, fmt.Errorf("create any block: %w", err)
}

return &pbbstream.Block{
Number: ethBlock.Number,
Id: ethBlock.GetFirehoseBlockID(),
ParentId: ethBlock.GetFirehoseBlockParentID(),
Timestamp: timestamppb.New(ethBlock.GetFirehoseBlockTime()),
LibNum: ethBlock.LIBNum(),
ParentNum: ethBlock.GetFirehoseBlockParentNumber(),
Payload: anyBlock,
}, nil
}

func FetchReceipts(ctx context.Context, block *rpc.Block, client *rpc.Client) (out map[string]*rpc.TransactionReceipt, err error) {
out = make(map[string]*rpc.TransactionReceipt)
lock := sync.Mutex{}

eg := llerrgroup.New(10)
for _, tx := range block.Transactions.Transactions {
if eg.Stop() {
continue // short-circuit the loop if we got an error
}
eg.Go(func() error {
receipt, err := client.TransactionReceipt(ctx, tx.Hash)
if err != nil {
return fmt.Errorf("fetching receipt for tx %q: %w", tx.Hash.Pretty(), err)
}
lock.Lock()
out[tx.Hash.Pretty()] = receipt
lock.Unlock()
return err
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return
}
29 changes: 29 additions & 0 deletions blockfetcher/silkworm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockfetcher

import (
"context"
"time"

"go.uber.org/zap"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
"github.com/streamingfast/firehose-ethereum/block"
)

type SilkwormBlockFetcher struct {
fetcher *BlockFetcher
}

func NewSilkwormBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *SilkwormBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &SilkwormBlockFetcher{
fetcher: fetcher,
}
}

func (f *SilkwormBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second }

func (f *SilkwormBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, blockNum)
}
4 changes: 3 additions & 1 deletion cmd/fireantelope/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func Chain() *firecore.Chain[*pbantelope.Block] {

Tools: &firecore.ToolsConfig[*pbantelope.Block]{

RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], parent *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
//toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain))
//toolsCmd.AddCommand(newToolsBackfillCmd(zlog))
parent.AddCommand(newPollerCmd(zlog, tracer))
parent.AddCommand(newSilkwormPollerCmd(zlog, tracer))

return nil
},
Expand Down
79 changes: 79 additions & 0 deletions cmd/fireantelope/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"fmt"
"github.com/pinax-network/firehose-antelope/blockfetcher"
"path"
"strconv"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/eth-go/rpc"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func newPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "poller",
Short: "poll blocks from different sources",
}

cmd.AddCommand(newSilkwormPollerCmd(logger, tracer))
return cmd
}

func newSilkwormPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "silkworm <rpc-endpoint> <first-streamable-block>",
Short: "poll blocks from silkworm rpc",
Args: cobra.ExactArgs(2),
RunE: pollerRunE(logger, tracer),
}
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")

return cmd
}

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

rpcEndpoint := args[0]

dataDir := sflags.MustGetString(cmd, "data-dir")
stateDir := path.Join(dataDir, "poller-state")

logger.Info("launching firehose-antelope poller", zap.String("rpc_endpoint", rpcEndpoint), zap.String("data_dir", dataDir), zap.String("state_dir", stateDir))

rpcClient := rpc.NewClient(rpcEndpoint)

firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err)
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")

fetcher := blockfetcher.NewSilkwormBlockFetcher(rpcClient, fetchInterval, 1*time.Second, logger)
handler := blockpoller.NewFireBlockHandler("type.googleapis.com/sf.ethereum.type.v2.Block")
poller := blockpoller.New(fetcher, handler, blockpoller.WithStoringState(stateDir), blockpoller.WithLogger(logger))

// there is currently no support for rpc.FinalizedBlock on eos evm, so we use the latest one
latestBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.LatestBlock)
if err != nil {
return fmt.Errorf("getting latest block: %w", err)
}

err = poller.Run(ctx, firstStreamableBlock, bstream.NewBlockRef(latestBlock.Hash.String(), uint64(latestBlock.Number)))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}

return nil
}
}
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ go 1.21
toolchain go1.21.5

require (
github.com/abourget/llerrgroup v0.2.0
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/eoscanada/eos-go v0.10.3-0.20231109144819-59afdfa3a37d
github.com/lytics/ordpool v0.0.0-20130426221837-8d833f097fe7
github.com/mitchellh/go-testing-interface v1.14.1
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af
github.com/streamingfast/firehose-core v1.0.0
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -45,14 +49,14 @@ require (
github.com/RoaringBitmap/roaring v0.9.4 // indirect
github.com/ShinyTrinkets/meta-logger v0.2.0 // indirect
github.com/ShinyTrinkets/overseer v0.3.0 // indirect
github.com/abourget/llerrgroup v0.2.0 // indirect
github.com/aws/aws-sdk-go v1.44.325 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.3.1 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bobg/go-generics/v2 v2.1.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/bufbuild/connect-go v1.10.0 // indirect
github.com/bufbuild/connect-grpchealth-go v1.1.1 // indirect
github.com/bufbuild/connect-grpcreflect-go v1.0.0 // indirect
Expand Down Expand Up @@ -97,6 +101,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/boxo v0.8.0 // indirect
github.com/ipfs/go-cid v0.4.0 // indirect
Expand Down Expand Up @@ -157,7 +162,6 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.15.0 // indirect
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 // indirect
github.com/streamingfast/dauth v0.0.0-20231120142446-843f4e045cc2 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bobg/go-generics/v2 v2.1.1 h1:4rN9upY6Xm4TASSMeH+NzUghgO4h/SbNrQphIjRd/R0=
github.com/bobg/go-generics/v2 v2.1.1/go.mod h1:iPMSRVFlzkJSYOCXQ0n92RA3Vxw0RBv2E8j9ZODXgHk=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/bufbuild/connect-go v1.10.0 h1:QAJ3G9A1OYQW2Jbk3DeoJbkCxuKArrvZgDt47mjdTbg=
github.com/bufbuild/connect-go v1.10.0/go.mod h1:CAIePUgkDR5pAFaylSMtNK45ANQjp9JvpluG20rhpV8=
github.com/bufbuild/connect-grpchealth-go v1.1.1 h1:ldceS3m7+Qvl3GI4yzB4oCg3uOdD+Y1bytc/5xuMpqo=
Expand Down Expand Up @@ -388,6 +392,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM=
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -606,8 +612,12 @@ github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77 h1:u7FWLqz3
github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77/go.mod h1:ngKU7WzHwVjOFpt2g+Wtob5mX4IvN90HYlnARcTRbmQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af h1:KH0tXSwKrsbNZ9RCKv+Fp4riu7ytzrc/hB3kXkwn0/Q=
github.com/streamingfast/eth-go v0.0.0-20231204174036-34e2cb6d64af/go.mod h1:UEm8dqibr3c3A1iIA3CHpkhN7j3X78prN7/55sXf3A0=
github.com/streamingfast/firehose-core v1.0.0 h1:6TV9M4JWTGkqI1Ocmn2ula4LLRq/WGfgLSJYH3wi+wc=
github.com/streamingfast/firehose-core v1.0.0/go.mod h1:y6GgxrGypjPyY25O/LBRIu2S1aZhjpMVX5AGLsqch0k=
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa h1:doUV6rNUz/A+Y8wvNT/2k2qUKF02kuCRvaKgr/S2d3k=
github.com/streamingfast/firehose-ethereum v1.4.23-0.20231213134745-920366d3b7aa/go.mod h1:LYBs6siIpmpKyUsSUiziYr6YkW+s5HKCTzzuMiCfjK0=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0/go.mod h1:cTNObq2Uofb330y05JbbZZ6RwE6QUXw5iVcHk1Fx3fk=
github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
Expand Down

0 comments on commit 1de921f

Please sign in to comment.