Skip to content

Commit

Permalink
Build blocks asynchronously
Browse files Browse the repository at this point in the history
When it is a node's turn to propose or verify a block, it calls the BlockBuilder's BuildBlock() and Verify() methods respectively.
However, this method may take a long time to complete, and during this time, the Epoch instance cannot process messages (since proposing a new block may be called indirectly via HandleMessage).

This commit adds an asynchronous task scheduler which the Epoch uses to register a callback to make itself propose the block or verify it
it asynchronously and continue the program flow from that point onwards.

While the block is verified or built, Simplex is free to handle other messages.

Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 14, 2025
1 parent 1a018a7 commit 92eb71e
Show file tree
Hide file tree
Showing 6 changed files with 488 additions and 42 deletions.
223 changes: 187 additions & 36 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ package simplex
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"simplex/record"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
)

const defaultMaxRoundWindow = 10
const (
defaultMaxRoundWindow = 10
defaultMaxPendingBlocks = 10
)

type Round struct {
num uint64
Expand Down Expand Up @@ -55,6 +60,8 @@ type EpochConfig struct {
type Epoch struct {
EpochConfig
// Runtime
sched *scheduler
lock sync.Mutex
lastBlock Block // latest block commited
canReceiveMessages atomic.Bool
finishCtx context.Context
Expand All @@ -66,6 +73,7 @@ type Epoch struct {
futureMessages messagesFromNode
round uint64 // The current round we notarize
maxRoundWindow uint64
maxPendingBlocks int
}

func NewEpoch(conf EpochConfig) (*Epoch, error) {
Expand All @@ -82,6 +90,9 @@ func (e *Epoch) AdvanceTime(t time.Duration) {

// HandleMessage notifies the engine about a reception of a message.
func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
e.lock.Lock()
defer e.lock.Unlock()

// Guard against receiving messages before we are ready to handle them.
if !e.canReceiveMessages.Load() {
e.Logger.Warn("Cannot receive a message")
Expand Down Expand Up @@ -112,11 +123,13 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
}

func (e *Epoch) init() error {
e.sched = NewScheduler()
e.finishCtx, e.finishFn = context.WithCancel(context.Background())
e.nodes = e.Comm.ListNodes()
e.quorumSize = Quorum(len(e.nodes))
e.rounds = make(map[uint64]*Round)
e.maxRoundWindow = defaultMaxRoundWindow
e.maxPendingBlocks = defaultMaxPendingBlocks
e.eligibleNodeIDs = make(map[string]struct{}, len(e.nodes))
e.futureMessages = make(messagesFromNode, len(e.nodes))
for _, node := range e.nodes {
Expand Down Expand Up @@ -406,10 +419,31 @@ func (e *Epoch) handleFinalizationMessage(message *Finalization, from NodeID) er
return nil
}

// If we have not received the proposal yet, we won't have a Round object in e.rounds,
// yet we may receive the corresponding finalization.
// This may happen if we're asynchronously verifying the proposal at the moment.
var pendingProposal bool
if _, exists := e.rounds[finalization.Round]; !exists && e.round == finalization.Round {
pendingProposal = true
}

// This finalization may correspond to a proposal from a future round, or to the proposal of the current round
// which we are still verifying.
if (e.round < finalization.Round && finalization.Round-e.round < e.maxRoundWindow) || pendingProposal {
e.Logger.Debug("Got vote from round too far in the future", zap.Uint64("round", finalization.Round), zap.Uint64("my round", e.round))
msgsForRound, exists := e.futureMessages[string(from)][finalization.Round]
if !exists {
msgsForRound = &messagesForRound{}
e.futureMessages[string(from)][finalization.Round] = msgsForRound
}
msgsForRound.finalization = message
return nil
}

// Have we already finalized this round?
round, exists := e.rounds[finalization.Round]
if !exists {
e.Logger.Debug("Received finalization for an unknown round", zap.Uint64("round", finalization.Round))
e.Logger.Debug("Received finalization for an unknown round", zap.Uint64("ourRound", e.round), zap.Uint64("round", finalization.Round))
return nil
}

Expand All @@ -427,13 +461,46 @@ func (e *Epoch) handleFinalizationMessage(message *Finalization, from NodeID) er
return e.maybeCollectFinalizationCertificate(round)
}

func (e *Epoch) handleVoteMessage(message *Vote, _ NodeID) error {
func (e *Epoch) handleVoteMessage(message *Vote, from NodeID) error {
vote := message.Vote

// Only process point to point votes.
// This is needed to prevent a malicious node from sending us a vote of a different node for a future round.
// Since we only verify the vote when it's due time, this will effectively block us from saving the real vote
// from the real node for a future round.
if !from.Equals(message.Signature.Signer) {
e.Logger.Debug("Received a vote signed by a different party than sent it",
zap.Stringer("signer", message.Signature.Signer), zap.Stringer("sender", from),
zap.Stringer("digest", vote.Digest))
return nil
}

// If we have not received the proposal yet, we won't have a Round object in e.rounds,
// yet we may receive the corresponding vote.
// This may happen if we're asynchronously verifying the proposal at the moment.
var pendingProposal bool
if _, exists := e.rounds[vote.Round]; !exists && e.round == vote.Round {
pendingProposal = true
}

// This vote may correspond to a proposal from a future round, or to the proposal of the current round
// which we are still verifying.
if (e.round < vote.Round && vote.Round-e.round < e.maxRoundWindow) || pendingProposal {
e.Logger.Debug("Got vote from round too far in the future", zap.Uint64("round", vote.Round), zap.Uint64("my round", e.round))
msgsForRound, exists := e.futureMessages[string(from)][vote.Round]
if !exists {
msgsForRound = &messagesForRound{}
e.futureMessages[string(from)][vote.Round] = msgsForRound
}
msgsForRound.vote = message
return nil
}

// TODO: what if we've received a vote for a round we didn't instantiate yet?
round, exists := e.rounds[vote.Round]
if !exists {
e.Logger.Debug("Received a vote for a non existent round", zap.Uint64("round", vote.Round))
e.Logger.Debug("Received a vote for a non existent round",
zap.Uint64("round", vote.Round), zap.Uint64("our round", e.round))
return nil
}

Expand Down Expand Up @@ -705,11 +772,24 @@ func (e *Epoch) handleBlockMessage(message *BlockMessage, _ NodeID) error {
return nil
}

pendingBlocks := e.sched.Size()
if pendingBlocks > e.maxPendingBlocks {
e.Logger.Warn("Too many blocks being verified to ingest another one", zap.Int("pendingBlocks", pendingBlocks))
return nil
}

vote := message.Vote
from := vote.Signature.Signer

md := block.BlockHeader()

e.Logger.Debug("Handling block message", zap.Stringer("digest", md.Digest), zap.Uint64("round", md.Round))

// Don't bother processing blocks from the past
if e.round > md.Round {
return nil
}

// Ignore block messages sent by us
if e.ID.Equals(from) {
e.Logger.Debug("Got a BlockMessage from ourselves or created by us")
Expand Down Expand Up @@ -762,31 +842,70 @@ func (e *Epoch) handleBlockMessage(message *BlockMessage, _ NodeID) error {
return nil
}

if !e.storeProposal(block) {
e.Logger.Warn("Unable to store proposed block for the round", zap.Stringer("NodeID", from), zap.Uint64("round", md.Round))
// TODO: timeout
}
// Create a task that will verify the block in the future, after its predecessors have also been verified.
task := e.scheduleBlockVerification(block, md, from, vote)

// Once we have stored the proposal, we have a Round object for the round.
// We store the vote to prevent verifying its signature again.
round, exists := e.rounds[md.Round]
if !exists {
// This shouldn't happen, but in case it does, return an error
return fmt.Errorf("programming error: round %d not found", md.Round)
}
round.votes[string(vote.Signature.Signer)] = &vote
// isBlockReadyToBeScheduled checks if the block is known to us either from some previous round,
// or from storage. If so, then we have verified it in the past, since only verified blocks are saved in memory.
canBeImmediatelyVerified := e.isBlockReadyToBeScheduled(md.Seq, md.Prev)

if err := block.Verify(); err != nil {
e.Logger.Debug("Failed verifying block", zap.Error(err))
return nil
}
record := BlockRecord(md, block.Bytes())
if err := e.WAL.Append(record); err != nil {
e.Logger.Error("Failed appending block to WAL", zap.Error(err))
return err
// Schedule the block to be verified once its direct predecessor have been verified,
// or if it can be verified immediately.
e.Logger.Debug("Scheduling block verification", zap.Uint64("round", md.Round))
e.sched.Schedule(md.Digest, task, md.Prev, canBeImmediatelyVerified)

return nil
}

func (e *Epoch) scheduleBlockVerification(block Block, md BlockHeader, from NodeID, vote Vote) func() {
return func() {
e.lock.Lock()
defer e.lock.Unlock()

if err := block.Verify(); err != nil {
e.Logger.Debug("Failed verifying block", zap.Error(err))
return
}
record := BlockRecord(md, block.Bytes())
e.WAL.Append(record)

if !e.storeProposal(block) {
e.Logger.Warn("Unable to store proposed block for the round", zap.Stringer("NodeID", from), zap.Uint64("round", md.Round))
return
// TODO: timeout
}

// Once we have stored the proposal, we have a Round object for the round.
// We store the vote to prevent verifying its signature again.
round, exists := e.rounds[md.Round]
if !exists {
// This shouldn't happen, but in case it does, return an error
e.Logger.Error("programming error: round not found", zap.Uint64("round", md.Round))
return
}
round.votes[string(vote.Signature.Signer)] = &vote

if err := e.doProposed(block, vote, from); err != nil {
e.Logger.Debug("Failed voting on block", zap.Error(err))
}
}
}

return e.doProposed(block, vote)
func (e *Epoch) isBlockReadyToBeScheduled(seq uint64, prev Digest) bool {
var ready bool

if seq > 0 {
// A block can be scheduled if its predecessor either exists in storage,
// or there exists a round object for it.
// Since we only create a round object after we verify the block,
// it means we have verified this block in the past.
_, ok := e.locateBlock(seq-1, prev[:])
ready = ok
} else {
// The first block is always ready to be scheduled
ready = true
}
return ready
}

func (e *Epoch) wasBlockAlreadyVerified(from NodeID, md BlockHeader) bool {
Expand Down Expand Up @@ -897,12 +1016,39 @@ func (e *Epoch) locateBlock(seq uint64, digest []byte) (Block, bool) {
return nil, false
}

func (e *Epoch) proposeBlock() error {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
return errors.New("failed to build block")
func (e *Epoch) buildBlock() {
metadata := e.metadata()

task := e.scheduleBlockBuilding(metadata)

// We set the task ID to be the hash of the previous block, since we don't know the digest
// of the block before it is built.
// We know, however, that any block before or after the previous block won't
// have this digest, under the assumption that the hash function is collision resistant.
// TODO: Inject this hash as a dependency and don't use SHA256 as a hardcoded function
taskID := sha256.Sum256(metadata.Prev[:])

e.Logger.Debug("Scheduling block building", zap.Uint64("round", metadata.Round))
canBeImmediatelyVerified := e.isBlockReadyToBeScheduled(metadata.Seq, metadata.Prev)
e.sched.Schedule(taskID, task, metadata.Prev, canBeImmediatelyVerified)
}

func (e *Epoch) scheduleBlockBuilding(metadata ProtocolMetadata) func() {
return func() {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, metadata)
if !ok {
e.Logger.Warn("Failed building block")
return
}

e.lock.Lock()
defer e.lock.Unlock()

e.proposeBlock(block)
}
}

func (e *Epoch) proposeBlock(block Block) error {
md := block.BlockHeader()

// Write record to WAL before broadcasting it, so that
Expand Down Expand Up @@ -945,6 +1091,13 @@ func (e *Epoch) proposeBlock() error {
}

func (e *Epoch) Metadata() ProtocolMetadata {
e.lock.Lock()
defer e.lock.Unlock()

return e.metadata()
}

func (e *Epoch) metadata() ProtocolMetadata {
var prev Digest
seq := e.Storage.Height()
if e.lastBlock != nil {
Expand All @@ -968,7 +1121,8 @@ func (e *Epoch) startRound() error {
leaderForCurrentRound := LeaderForRound(e.nodes, e.round)

if e.ID.Equals(leaderForCurrentRound) {
return e.proposeBlock()
e.buildBlock()
return nil
}

// If we're not the leader, check if we have received a proposal earlier for this round
Expand All @@ -980,7 +1134,7 @@ func (e *Epoch) startRound() error {
return e.handleBlockMessage(msgsForRound.proposal, leaderForCurrentRound)
}

func (e *Epoch) doProposed(block Block, voteFromLeader Vote) error {
func (e *Epoch) doProposed(block Block, voteFromLeader Vote, from NodeID) error {
vote, err := e.voteOnBlock(block)
if err != nil {
return err
Expand Down Expand Up @@ -1075,6 +1229,8 @@ func (e *Epoch) storeNotarization(notarization Notarization) error {
}

func (e *Epoch) maybeLoadFutureMessages(round uint64) {
e.Logger.Debug("Loading messages received for this round in the past", zap.Uint64("round", round))

for from, messagesFromNode := range e.futureMessages {
if msgs, exists := messagesFromNode[round]; exists {
if msgs.proposal != nil {
Expand Down Expand Up @@ -1102,11 +1258,6 @@ func (e *Epoch) maybeLoadFutureMessages(round uint64) {
func (e *Epoch) storeProposal(block Block) bool {
md := block.BlockHeader()

// Don't bother processing blocks from the past
if e.round > md.Round {
return false
}

// Have we already received a block from that node?
// If so, it cannot change its mind and send us a different block.
if _, exists := e.rounds[md.Round]; exists {
Expand Down
17 changes: 16 additions & 1 deletion epoch_multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestSimplexMultiNodeSimple(t *testing.T) {
for _, n := range instances {
n.ledger.waitForBlockCommit(uint64(seq))
}
bb.triggerNewBlock()
}
}

Expand Down Expand Up @@ -147,6 +146,22 @@ func (tw *testWAL) Append(b []byte) error {
return err
}

func (tw *testWAL) assertWALSize(n int) {
tw.lock.Lock()
defer tw.lock.Unlock()

for {
rawRecords, err := tw.WriteAheadLog.ReadAll()
require.NoError(tw.t, err)

if len(rawRecords) == n {
return
}

tw.signal.Wait()
}
}

func (tw *testWAL) assertNotarization(round uint64) {
tw.lock.Lock()
defer tw.lock.Unlock()
Expand Down
Loading

0 comments on commit 92eb71e

Please sign in to comment.