Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor block by block #909

Merged
merged 39 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
45b928c
using batch-by-batch + latest changes from zkevm
kstoykov Jul 29, 2024
339d766
manual "cherry-pick" a2b17f7ceff0ec41283d4f8b404af253855656dc
kstoykov Jul 29, 2024
93a4207
manual "cherry-pick" a2b17f7ceff0ec41283d4f8b404af253855656dc
kstoykov Jul 29, 2024
4f7e4b8
prepare sequencer for new limbo recovery
kstoykov Jul 29, 2024
e867903
organize var declations
kstoykov Jul 29, 2024
0515db5
Merge branch 'zkevm' into executor-block-by-block
kstoykov Jul 29, 2024
96558b2
restore interface
kstoykov Jul 29, 2024
6f4c98e
Merge branch 'zkevm' into executor-block-by-block
kstoykov Jul 30, 2024
944a4ad
organize stage in struct
kstoykov Jul 30, 2024
a677ba7
verifier
kstoykov Jul 30, 2024
f2710ad
update delete blocks func
kstoykov Jul 30, 2024
d1be243
verifier
kstoykov Jul 30, 2024
75b6051
add missing return err
kstoykov Jul 31, 2024
8c42301
add account + storage index into execution stage
kstoykov Jul 31, 2024
2ba343d
update unwind and stages processing
kstoykov Jul 31, 2024
e8d8d9b
limbo
kstoykov Aug 1, 2024
af2d4fe
add limbo for single block in a batch only
kstoykov Aug 1, 2024
4062ae8
code cleanup
kstoykov Aug 2, 2024
e98d8d6
remove batch instant close
kstoykov Aug 2, 2024
d37d92c
remove defer
kstoykov Aug 2, 2024
7ee3982
fix limbo witness generation
kstoykov Aug 2, 2024
fb3d167
restoring blockEnd in DS
kstoykov Aug 2, 2024
01d8593
update witness generation in case of limbo
kstoykov Aug 2, 2024
8885ece
Merge branch 'zkevm' into executor-block-by-block
kstoykov Aug 2, 2024
fb9ae1e
remove hardcoded forkid
kstoykov Aug 2, 2024
1b8d293
create our buckets into txpool db
kstoykov Aug 2, 2024
5239d76
optimizing datastream writes
V-Staykov Aug 2, 2024
7cce826
update comment
kstoykov Aug 5, 2024
7e72b4f
add timer, remove old code and organize func's args
kstoykov Aug 5, 2024
21ddfb5
Merge branch 'zkevm' into executor-block-by-block
kstoykov Aug 6, 2024
894cda9
l1infotree - highest seen + tiny refactor (#918)
kstoykov Aug 6, 2024
b372a7c
add flag for verification timeout
kstoykov Aug 6, 2024
7dd5658
Merge branch 'zkevm' into executor-block-by-block
kstoykov Aug 6, 2024
fb98560
Merge branch 'zkevm' into executor-block-by-block
kstoykov Aug 6, 2024
417f603
remove unused flag
kstoykov Aug 7, 2024
53af394
update kurtosis params file
kstoykov Aug 7, 2024
85dcc92
update kurtosis
kstoykov Aug 7, 2024
8bb9141
update kurtosis tests
kstoykov Aug 7, 2024
ac60be3
make Counter & GER to be per-block stored
kstoykov Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,6 @@ var (
Usage: "Batch seal time. Defaults to 12s",
Value: "12s",
}
SequencerNonEmptyBatchSealTime = cli.StringFlag{
Name: "zkevm.sequencer-non-empty-batch-seal-time",
Usage: "Batch seal time. Defaults to 3s",
Value: "3s",
}
SequencerHaltOnBatchNumber = cli.Uint64Flag{
Name: "zkevm.sequencer-halt-on-batch-number",
Usage: "Halt the sequencer on this batch number",
Expand Down
31 changes: 0 additions & 31 deletions core/blockchain_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,37 +296,6 @@ func PrepareBlockTxExecution(
return &blockContextImpl, excessDataGas, &blockGer, &blockL1BlockHash, nil
}

func FinalizeBlockExecutionWithHistoryWrite(
engine consensus.Engine, stateReader state.StateReader,
header *types.Header, txs types.Transactions, uncles []*types.Header,
stateWriter state.WriterWithChangeSets, cc *chain.Config,
ibs *state.IntraBlockState, receipts types.Receipts,
withdrawals []*types.Withdrawal, headerReader consensus.ChainHeaderReader,
isMining bool, excessDataGas *big.Int,
) (newBlock *types.Block, newTxs types.Transactions, newReceipt types.Receipts, err error) {
newBlock, newTxs, newReceipt, err = FinalizeBlockExecution(
engine,
stateReader,
header,
txs,
uncles,
stateWriter,
cc,
ibs,
receipts,
withdrawals,
headerReader,
isMining,
excessDataGas,
)

if err := stateWriter.WriteHistory(); err != nil {
return nil, nil, nil, fmt.Errorf("writing history for block %d failed: %w", header.Number.Uint64(), err)
}

return newBlock, newTxs, newReceipt, nil
}

func CreateReceiptForBlockInfoTree(receipt *types.Receipt, chainConfig *chain.Config, blockNum uint64, execResult *ExecutionResult) *types.Receipt {
// [hack]TODO: remove this after bug is fixed
localReceipt := receipt.Clone()
Expand Down
2 changes: 1 addition & 1 deletion core/vm/zk_batch_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (bcc *BatchCounterCollector) CombineCollectors(verifyMerkleProof bool) (Cou
// rlp level counters and execution level counters
// this one returns the counters as they are so far, without adding processBatchLevelData, processChangeL2Block and decodeChangeL2BlockTx
// used to save batch counter progress without adding the said counters twice
func (bcc *BatchCounterCollector) CombineCollectorsNoChanges(verifyMerkleProof bool) Counters {
func (bcc *BatchCounterCollector) CombineCollectorsNoChanges() Counters {
// combine all the counters we have so far

// if we have external coutners use them, otherwise create new
Expand Down
31 changes: 13 additions & 18 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {

}()

tx, err := backend.chainDB.BeginRw(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()

if !config.DeprecatedTxPool.Disable {
// we need to start the pool before stage loop itself
// the pool holds the info about how execution stage should work - as regular or as limbo recovery
if err := backend.txPool2.StartIfNotStarted(ctx, backend.txPool2DB, tx); err != nil {
return nil, err
}

backend.txPool2Fetch.ConnectCore()
backend.txPool2Fetch.ConnectSentries()
var newTxsBroadcaster *txpool2.NewSlotsStreams
Expand Down Expand Up @@ -696,12 +708,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {

backend.ethBackendRPC, backend.miningRPC, backend.stateChangesClient = ethBackendRPC, miningRPC, stateDiffClient

tx, err := backend.chainDB.BeginRw(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()

// create buckets
if err := createBuckets(tx); err != nil {
return nil, err
Expand Down Expand Up @@ -833,7 +839,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
backend.engine,
)

var legacyExecutors []legacy_executor_verifier.ILegacyExecutor
var legacyExecutors []*legacy_executor_verifier.Executor = make([]*legacy_executor_verifier.Executor, 0, len(cfg.ExecutorUrls))
if len(cfg.ExecutorUrls) > 0 && cfg.ExecutorUrls[0] != "" {
levCfg := legacy_executor_verifier.Config{
GrpcUrls: cfg.ExecutorUrls,
Expand All @@ -853,7 +859,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
backend.chainConfig,
backend.chainDB,
witnessGenerator,
backend.l1Syncer,
backend.dataStream,
)

Expand All @@ -866,12 +871,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// we switch context from being an RPC node to a sequencer
backend.txPool2.ForceUpdateLatestBlock(executionProgress)

// we need to start the pool before stage loop itself
// the pool holds the info about how execution stage should work - as regular or as limbo recovery
if err := backend.txPool2.StartIfNotStarted(ctx, backend.txPool2DB, tx); err != nil {
return nil, err
}

l1BlockSyncer := syncer.NewL1Syncer(
ctx,
ethermanClients,
Expand Down Expand Up @@ -967,10 +966,6 @@ func createBuckets(tx kv.RwTx) error {
return err
}

if err := txpool.CreateTxPoolBuckets(tx); err != nil {
return err
}

return nil
}

Expand Down
1 change: 0 additions & 1 deletion eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Zk struct {
DatastreamVersion int
SequencerBlockSealTime time.Duration
SequencerBatchSealTime time.Duration
SequencerNonEmptyBatchSealTime time.Duration
SequencerHaltOnBatchNumber uint64
ExecutorUrls []string
ExecutorStrictMode bool
Expand Down
9 changes: 9 additions & 0 deletions eth/stagedsync/stage_indexes_zkevm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package stagedsync

import (
"github.com/gateway-fm/cdk-erigon-lib/kv"
)

func PromoteHistory(logPrefix string, tx kv.RwTx, changesetBucket string, start, stop uint64, cfg HistoryCfg, quit <-chan struct{}) error {
return promoteHistory(logPrefix, tx, changesetBucket, start, stop, cfg, quit)
}
6 changes: 3 additions & 3 deletions eth/stagedsync/stages/stages_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
ForkId SyncStage = "ForkId"
L1SequencerSync SyncStage = "L1SequencerSync"
L1InfoTree SyncStage = "L1InfoTree"
HighestUsedL1InfoIndex SyncStage = "HighestUsedL1InfoTree"
SequenceExecutorVerify SyncStage = "SequenceExecutorVerify"
L1BlockSync SyncStage = "L1BlockSync"
// HighestUsedL1InfoIndex SyncStage = "HighestUsedL1InfoTree"
SequenceExecutorVerify SyncStage = "SequenceExecutorVerify"
L1BlockSync SyncStage = "L1BlockSync"
)
2 changes: 2 additions & 0 deletions smt/pkg/db/mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const TableAccountValues = "HermezSmtAccountValues"
const TableMetadata = "HermezSmtMetadata"
const TableHashKey = "HermezSmtHashKey"

var HermezSmtTables = []string{TableSmt, TableStats, TableAccountValues, TableMetadata, TableHashKey}

type EriDb struct {
kvTx kv.RwTx
tx SmtDbTx
Expand Down
1 change: 0 additions & 1 deletion turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ var DefaultFlags = []cli.Flag{
&utils.SmtRegenerateInMemory,
&utils.SequencerBlockSealTime,
&utils.SequencerBatchSealTime,
&utils.SequencerNonEmptyBatchSealTime,
&utils.SequencerHaltOnBatchNumber,
&utils.ExecutorUrls,
&utils.ExecutorStrictMode,
Expand Down
7 changes: 0 additions & 7 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
panic(fmt.Sprintf("could not parse sequencer batch seal time timeout value %s", sequencerBatchSealTimeVal))
}

sequencerNonEmptyBatchSealTimeVal := ctx.String(utils.SequencerNonEmptyBatchSealTime.Name)
sequencerNonEmptyBatchSealTime, err := time.ParseDuration(sequencerNonEmptyBatchSealTimeVal)
if err != nil {
panic(fmt.Sprintf("could not parse sequencer batch seal time timeout value %s", sequencerNonEmptyBatchSealTimeVal))
}

effectiveGasPriceForEthTransferVal := ctx.Float64(utils.EffectiveGasPriceForEthTransfer.Name)
effectiveGasPriceForErc20TransferVal := ctx.Float64(utils.EffectiveGasPriceForErc20Transfer.Name)
effectiveGasPriceForContractInvocationVal := ctx.Float64(utils.EffectiveGasPriceForContractInvocation.Name)
Expand Down Expand Up @@ -128,7 +122,6 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SmtRegenerateInMemory: ctx.Bool(utils.SmtRegenerateInMemory.Name),
SequencerBlockSealTime: sequencerBlockSealTime,
SequencerBatchSealTime: sequencerBatchSealTime,
SequencerNonEmptyBatchSealTime: sequencerNonEmptyBatchSealTime,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","),
ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name),
Expand Down
3 changes: 1 addition & 2 deletions turbo/stages/zk_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func NewSequencerZkStages(ctx context.Context,
zkStages.StageL1InfoTreeCfg(db, cfg.Zk, l1InfoTreeSyncer),
zkStages.StageSequencerL1BlockSyncCfg(db, cfg.Zk, l1BlockSyncer),
zkStages.StageDataStreamCatchupCfg(datastreamServer, db, cfg.Genesis.Config.ChainID.Uint64(), cfg.DatastreamVersion, cfg.HasExecutors()),
zkStages.StageSequencerInterhashesCfg(db, notifications.Accumulator),
zkStages.StageSequenceBlocksCfg(
db,
cfg.Prune,
Expand All @@ -139,11 +138,11 @@ func NewSequencerZkStages(ctx context.Context,
cfg.Zk,
txPool,
txPoolDb,
verifier,
uint16(cfg.YieldSize),
),
stagedsync.StageHashStateCfg(db, dirs, cfg.HistoryV3, agg),
zkStages.StageZkInterHashesCfg(db, true, true, false, dirs.Tmp, blockReader, controlServer.Hd, cfg.HistoryV3, agg, cfg.Zk),
zkStages.StageSequencerExecutorVerifyCfg(db, verifier, txPool, controlServer.ChainConfig, cfg.Zk),
stagedsync.StageHistoryCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageLogIndexCfg(db, cfg.Prune, dirs.Tmp),
stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, dirs.Tmp),
Expand Down
101 changes: 73 additions & 28 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type DataStreamServer struct {
stream *datastreamer.StreamServer
chainId uint64
highestBlockWritten,
highestClosedBatchWritten,
highestBatchWritten *uint64
}

Expand Down Expand Up @@ -118,7 +119,33 @@ func NewDataStreamEntries(size int) *DataStreamEntries {
}
}

func (srv *DataStreamServer) CommitEntriesToStreamProto(entries []DataStreamEntryProto, latestBlockNum, latestBatchNum *uint64) error {
func (srv *DataStreamServer) commitAtomicOp(latestBlockNum, latestBatchNum, latestClosedBatch *uint64) error {
if err := srv.stream.CommitAtomicOp(); err != nil {
return err
}

// copy the values in case they are changed outside the function
// pointers are used for easier check if we should set check them from the DS or not
// since 0 is a valid number, we can't use it
if latestBlockNum != nil {
a := *latestBlockNum
srv.highestBlockWritten = &a
}

if latestBatchNum != nil {
a := *latestBatchNum
srv.highestBatchWritten = &a
}

if latestClosedBatch != nil {
a := *latestClosedBatch
srv.highestClosedBatchWritten = &a
}

return nil
}

func (srv *DataStreamServer) commitEntriesToStreamProto(entries []DataStreamEntryProto) error {
for _, entry := range entries {
entryType := entry.Type()

Expand All @@ -137,16 +164,6 @@ func (srv *DataStreamServer) CommitEntriesToStreamProto(entries []DataStreamEntr
}
}
}

if latestBlockNum != nil {
a := *latestBlockNum
srv.highestBlockWritten = &a
}

if latestBatchNum != nil {
a := *latestBatchNum
srv.highestBatchWritten = &a
}
return nil
}

Expand Down Expand Up @@ -454,24 +471,12 @@ func (srv *DataStreamServer) GetHighestBatchNumber() (uint64, error) {
return *srv.highestBatchWritten, nil
}

header := srv.stream.GetHeader()

if header.TotalEntries == 0 {
return 0, nil
entry, found, err := srv.getLastEntryOfType(datastreamer.EntryType(types.EntryTypeBatchStart))
if err != nil {
return 0, err
}

entryNum := header.TotalEntries - 1
var err error
var entry datastreamer.FileEntry
for {
entry, err = srv.stream.GetEntry(entryNum)
if err != nil {
return 0, err
}
if entry.Type == datastreamer.EntryType(1) {
break
}
entryNum -= 1
if !found {
return 0, nil
}

batch, err := types.UnmarshalBatchStart(entry.Data)
Expand All @@ -484,6 +489,28 @@ func (srv *DataStreamServer) GetHighestBatchNumber() (uint64, error) {
return batch.Number, nil
}

func (srv *DataStreamServer) GetHighestClosedBatch() (uint64, error) {
if srv.highestClosedBatchWritten != nil {
return *srv.highestClosedBatchWritten, nil
}
entry, found, err := srv.getLastEntryOfType(datastreamer.EntryType(types.EntryTypeBatchEnd))
if err != nil {
return 0, err
}
if !found {
return 0, nil
}

batch, err := types.UnmarshalBatchEnd(entry.Data)
if err != nil {
return 0, err
}

srv.highestClosedBatchWritten = &batch.Number

return batch.Number, nil
}

// must be done on offline server
// finds the position of the block bookmark entry and deletes from it onward
// blockNumber 10 would return the stream to before block 10 bookmark
Expand Down Expand Up @@ -523,3 +550,21 @@ func (srv *DataStreamServer) UnwindToBatchStart(batchNumber uint64) error {

return srv.stream.TruncateFile(entryNum)
}

func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType) (datastreamer.FileEntry, bool, error) {
header := srv.stream.GetHeader()
emtryEntry := datastreamer.FileEntry{}

// loop will become infinite if using unsigned type
for entryNum := int64(header.TotalEntries - 1); entryNum >= 0; entryNum-- {
entry, err := srv.stream.GetEntry(uint64(entryNum))
if err != nil {
return emtryEntry, false, err
}
if entry.Type == entryType {
return entry, true, nil
}
}

return emtryEntry, false, nil
}
Loading
Loading