From 569ec3be7c9067942c3d402a164596d20da88fe1 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:30:39 +0100 Subject: [PATCH] =?UTF-8?q?datastream=20repopulation=20logic=20changes=20a?= =?UTF-8?q?nd=20removal=20of=20batch=20partially=20=E2=80=A6=20(#992)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * datastream repopulation logic changes and removal of batch partially processed partially processed removed in favour of simply sealing the WIP batch on a restart * refactor of checking for stream gap in sequencing * do not connect to datastream during startup this causes timeout problems on new nodes that could spend a long time running L1 sync * refactor of batch end logic in sequencing * tidy up and comments around new datastream handling in sequencer --- eth/backend.go | 21 +------ zk/hermez_db/db.go | 25 --------- zk/stages/stage_dataStreamCatchup.go | 21 +++++-- zk/stages/stage_sequence_execute.go | 33 ++++++----- zk/stages/stage_sequence_execute_batch.go | 29 +--------- zk/stages/stage_sequence_execute_blocks.go | 6 -- .../stage_sequence_execute_data_stream.go | 55 ++++++++++++++----- .../stage_sequence_execute_injected_batch.go | 3 +- zk/stages/stage_sequence_execute_unwind.go | 9 --- 9 files changed, 78 insertions(+), 124 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 53070c83706..aad639d4c4c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") - if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { - return nil, err - } backend.preStartTasks.WarmUpDataStream = true } } @@ -1052,23 +1049,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien // creates a datastream client with default parameters func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient { - // datastream - // Create client - log.Info("Starting datastream client...") - // retry connection - datastreamClient := client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) - - for i := 0; i < 30; i++ { - // Start client (connect to the server) - if err := datastreamClient.Start(); err != nil { - log.Warn(fmt.Sprintf("Error when starting datastream client, retrying... Error: %s", err)) - time.Sleep(1 * time.Second) - } else { - log.Info("Datastream client initialized...") - return datastreamClient - } - } - panic("datastream client could not be initialized") + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) } func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 3130db81570..2adbf23cee7 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) -} - -func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) -} - -func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { - v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) - if err != nil { - return false, err - } - return len(v) > 0, nil -} - -func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { - for batch := fromBatch; batch <= toBatch; batch++ { - if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { - return err - } - } - return nil -} - func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index f3cae1b3ca3..199fe4d69d4 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err + var ( + err error + finalBlockNumber uint64 + ) + + if sequencer.IsSequencer() { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) + if err != nil { + return 0, err + } + } else { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err + } } - previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) + previousProgress, err := srv.GetHighestBlockNumber() if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 842e3423ca2..c3f99e8caa1 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -46,11 +46,6 @@ func SpawnSequencingStage( return err } - isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) - if err != nil { - return err - } - forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -66,7 +61,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -79,31 +74,35 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } + if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { + return err + } return sdb.tx.Commit() } + // handle cases where the last batch wasn't committed to the data stream. + // this could occur because we're migrating from an RPC node to a sequencer + // or because the sequencer was restarted and not all processes completed (like waiting from remote executor) + // we consider the data stream as verified by the executor so treat it as "safe" and unwind blocks beyond there + // if we identify any. During normal operation this function will simply check and move on without performing + // any action. + isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) + if err != nil || isUnwinding { + return err + } + tryHaltSequencer(batchContext, batchState.batchNumber) if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { return err } - batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) + batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) if err != nil { return err } - if !isLastBatchPariallyProcessed { - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { - return err - } - } - if batchState.isL1Recovery() { if cfg.zk.L1SyncStopBatch > 0 && batchState.batchNumber > cfg.zk.L1SyncStopBatch { log.Info(fmt.Sprintf("[%s] L1 recovery has completed!", logPrefix), "batch", batchState.batchNumber) diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 6f7dc4275da..ec0fec521c6 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,28 +12,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { - if isLastBatchPariallyProcessed { - return lastBatch - } - - return lastBatch + 1 -} - -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { - var intermediateUsedCounters *vm.Counters - if isLastBatchPariallyProcessed { - intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) - if err != nil { - return nil, err - } - if found { - intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) - } else { - log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) - } - } - +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } - if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { - return false, err - } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -158,9 +134,6 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } - if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { - return err - } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 495e9114846..060c753a26d 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,12 +247,6 @@ func finaliseBlock( return nil, err } - // write partially processed - err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) - if err != nil { - return nil, err - } - // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 373511a1212..5e2849ce44f 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -7,8 +7,10 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/zk/datastream/server" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/core/vm" ) type SequencerBatchStreamWriter struct { @@ -62,6 +64,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } + if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { + return checkedVerifierBundles, err + } + // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -78,29 +84,52 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { +func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return err + return false, err } if isLastEntryBatchEnd { - return nil + return false, nil } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + lastBatch := batchState.batchNumber - 1 + + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch)) + + rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch) if err != nil { - return err + return false, err + } + + latestCounters := vm.NewCountersFromUsedMap(rawCounters) + + endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) + + if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil { + return false, err } - lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) + // now check if there is a gap in the stream vs the state db + streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream) if err != nil { - return err + return false, err } - root := lastBlock.Root() - if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return err + + unwinding := false + if streamProgress > 0 && streamProgress < thisBlock { + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress) + if err != nil { + return true, err + } + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), + "streamHeight", streamProgress, + "sequencerHeight", thisBlock, + ) + u.UnwindTo(streamProgress, block.Hash()) + unwinding = true } - return nil + + return unwinding, nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index 323b7a0f2f9..e1917b7748a 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,8 +80,7 @@ func processInjectedInitialBatch( return err } - // deleting the partially processed flag - return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) + return err } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index 46c0a58846f..b8918aa33d7 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } - // only seq - if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - if lastBatchToKeepBeforeFrom == fromBatch { - if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - } return nil }