Skip to content

Commit

Permalink
update (#916)
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 authored Aug 5, 2024
1 parent 32b4b68 commit 2d13042
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions zk/datastream/server/datastream_populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (srv *DataStreamServer) WriteWholeBatchToStream(
batchNum uint64,
) error {
var err error
if err = srv.stream.StartAtomicOp(); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

blocksForBatch, err := reader.GetL2BlockNosByBatch(batchNum)
if err != nil {
return err
Expand All @@ -58,6 +53,11 @@ func (srv *DataStreamServer) WriteWholeBatchToStream(
return err
}

if err = srv.stream.StartAtomicOp(); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

blocks := make([]eritypes.Block, 0)
txsPerBlock := make(map[uint64][]eritypes.Transaction)
for blockNumber := fromBlockNum; blockNumber <= toBlockNum; blockNumber++ {
Expand Down Expand Up @@ -116,14 +116,14 @@ func (srv *DataStreamServer) WriteBlocksToStreamConsecutively(
return err
}

if err = srv.stream.StartAtomicOp(); err != nil {
if err = srv.UnwindIfNecessary(logPrefix, reader, from, latestbatchNum, batchNum); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

if err = srv.UnwindIfNecessary(logPrefix, reader, from, latestbatchNum, batchNum); err != nil {
if err = srv.stream.StartAtomicOp(); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

// check if a new batch starts and the old needs closing before that
// if it is already closed with a batch end, do not add a new batch end
Expand Down Expand Up @@ -220,17 +220,17 @@ func (srv *DataStreamServer) WriteBlockWithBatchStartToStream(
t := utils.StartTimer("write-stream", "writeblockstostream")
defer t.LogTimer()

if err = srv.stream.StartAtomicOp(); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

blockNum := block.NumberU64()

if err = srv.UnwindIfNecessary(logPrefix, reader, blockNum, prevBlockBatchNum, batchNum); err != nil {
return err
}

if err = srv.stream.StartAtomicOp(); err != nil {
return err
}
defer srv.stream.RollbackAtomicOp()

// if start of new batch add batch start entries
var batchStartEntries *DataStreamEntries
if prevBlockBatchNum != batchNum {
Expand Down

0 comments on commit 2d13042

Please sign in to comment.