Skip to content

Commit

Permalink
Build blocks asynchronously
Browse files Browse the repository at this point in the history
When it is a node's turn to propose a block, it calls the BlockBuilder's BuildBlock method.
However, this method may take a long time to complete, and during this time, the Epoch instance
cannot process messages (since proposing a new block may be called indirectly via HandleMessage).

This commit adds an asynchronous task scheduler which the Epoch uses to register a callback to make itself propose the block
after building it asynchronously.

Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 2, 2025
1 parent ec43626 commit b6f9417
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 10 deletions.
84 changes: 75 additions & 9 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"simplex/record"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -55,6 +56,8 @@ type EpochConfig struct {
type Epoch struct {
EpochConfig
// Runtime
as *asyncScheduler
lock sync.Mutex
lastBlock Block // latest block commited
canReceiveMessages bool
finishCtx context.Context
Expand Down Expand Up @@ -82,6 +85,9 @@ func (e *Epoch) AdvanceTime(t time.Duration) {

// HandleMessage notifies the engine about a reception of a message.
func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
e.lock.Lock()
defer e.lock.Unlock()

// Guard against receiving messages before we are ready to handle them.
if !e.canReceiveMessages {
e.Logger.Warn("Cannot receive a message")
Expand Down Expand Up @@ -117,6 +123,7 @@ func (e *Epoch) init() error {
e.canReceiveMessages = true
}()

e.as = newAsyncScheduler()
e.finishCtx, e.finishFn = context.WithCancel(context.Background())
e.nodes = e.Comm.ListNodes()
e.quorumSize = Quorum(len(e.nodes))
Expand All @@ -142,6 +149,9 @@ func (e *Epoch) init() error {
}

func (e *Epoch) Start() error {
e.lock.Lock()
defer e.lock.Unlock()

return e.syncFromWal()
}

Expand Down Expand Up @@ -250,10 +260,26 @@ func (e *Epoch) handleFinalizationMessage(message *Message, from NodeID) error {
return nil
}

var pendingProposal bool
if _, exists := e.rounds[finalization.Round]; !exists && e.round == finalization.Round {
pendingProposal = true
}

if (e.round < finalization.Round && finalization.Round-e.round < e.maxRoundWindow) || pendingProposal {
e.Logger.Debug("Got vote from round too far in the future", zap.Uint64("round", finalization.Round), zap.Uint64("my round", e.round))
msgsForRound, exists := e.futureMessages[string(from)][finalization.Round]
if !exists {
msgsForRound = &messagesForRound{}
e.futureMessages[string(from)][finalization.Round] = msgsForRound
}
msgsForRound.finalization = message
return nil
}

// Have we already finalized this round?
round, exists := e.rounds[finalization.Round]
if !exists {
e.Logger.Debug("Received finalization for an unknown round", zap.Uint64("round", finalization.Round))
e.Logger.Debug("Received finalization for an unknown round", zap.Uint64("ourRound", e.round), zap.Uint64("round", finalization.Round))
return nil
}

Expand All @@ -271,14 +297,37 @@ func (e *Epoch) handleFinalizationMessage(message *Message, from NodeID) error {
return e.maybeCollectFinalizationCertificate(round)
}

func (e *Epoch) handleVoteMessage(message *Message, _ NodeID) error {
func (e *Epoch) handleVoteMessage(message *Message, from NodeID) error {
msg := message.VoteMessage
vote := msg.Vote

// Only process a point to point votes
if !from.Equals(msg.Signature.Signer) {
e.Logger.Debug("Received a vote signed by a different party than sent it", zap.Stringer("signer", msg.Signature.Signer), zap.Stringer("sender", from))
return nil
}

var pendingProposal bool
if _, exists := e.rounds[vote.Round]; !exists && e.round == vote.Round {
pendingProposal = true
}

if (e.round < vote.Round && vote.Round-e.round < e.maxRoundWindow) || pendingProposal {
e.Logger.Debug("Got vote from round too far in the future", zap.Uint64("round", vote.Round), zap.Uint64("my round", e.round))
msgsForRound, exists := e.futureMessages[string(from)][vote.Round]
if !exists {
msgsForRound = &messagesForRound{}
e.futureMessages[string(from)][vote.Round] = msgsForRound
}
msgsForRound.vote = message
return nil
}

// TODO: what if we've received a vote for a round we didn't instantiate yet?
round, exists := e.rounds[vote.Round]
if !exists {
e.Logger.Debug("Received a vote for a non existent round", zap.Uint64("round", vote.Round))
e.Logger.Debug("Received a vote for a non existent round",
zap.Uint64("round", vote.Round), zap.Uint64("our round", e.round))
return nil
}

Expand Down Expand Up @@ -778,12 +827,24 @@ func (e *Epoch) locateBlock(seq uint64, digest []byte) (Block, bool) {
return nil, false
}

func (e *Epoch) proposeBlock() error {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
return errors.New("failed to build block")
func (e *Epoch) buildBlock() {
task := func() {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
e.Logger.Warn("Failed building block")
return
}

e.lock.Lock()
defer e.lock.Unlock()

e.proposeBlock(block)
}

e.as.Schedule(task)
}

func (e *Epoch) proposeBlock(block Block) error {
md := block.BlockHeader()

// Write record to WAL before broadcasting it, so that
Expand Down Expand Up @@ -846,7 +907,8 @@ func (e *Epoch) startRound() error {
leaderForCurrentRound := leaderForRound(e.nodes, e.round)

if e.ID.Equals(leaderForCurrentRound) {
return e.proposeBlock()
e.buildBlock()
return nil
}

// If we're not the leader, check if we have received a proposal earlier for this round
Expand Down Expand Up @@ -903,8 +965,10 @@ func (e *Epoch) voteOnBlock(block Block) (Vote, error) {
}

func (e *Epoch) increaseRound() {
e.Logger.Info(fmt.Sprintf("Moving to a new round (%d --> %d", e.round, e.round+1), zap.Uint64("round", e.round+1))
e.round++
e.Logger.Info(fmt.Sprintf("Moving to a new round (%d --> %d", e.round-1, e.round),
zap.Uint64("round", e.round),
zap.Stringer("leader", leaderForRound(e.nodes, e.round)))
}

func (e *Epoch) doNotarized() error {
Expand Down Expand Up @@ -955,6 +1019,8 @@ func (e *Epoch) storeNotarization(notarization Notarization) error {
}

func (e *Epoch) maybeLoadFutureMessages(round uint64) {
e.Logger.Debug("Loading messages received for this round in the past", zap.Uint64("round", round))

for from, messagesFromNode := range e.futureMessages {
if msgs, exists := messagesFromNode[round]; exists {
if msgs.proposal != nil {
Expand Down
1 change: 0 additions & 1 deletion epoch_multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestSimplexMultiNodeSimple(t *testing.T) {
for _, n := range instances {
n.ledger.waitForBlockCommit(uint64(seq))
}
bb.triggerNewBlock()
}
}

Expand Down
2 changes: 2 additions & 0 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func TestEpochSimpleFlow(t *testing.T) {
injectFinalization(t, e, block, NodeID{2})
injectFinalization(t, e, block, NodeID{3})

storage.waitForBlockCommit(uint64(i))

committedData := storage.data[uint64(i)].Block.Bytes()
require.Equal(t, block.Bytes(), committedData)
}
Expand Down
66 changes: 66 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,69 @@ func RetrieveLastBlockFromStorage(s Storage) (Block, error) {
}
return lastBlock, nil
}

type asyncScheduler struct {
tasks chan func()
busy chan struct{}
close chan struct{}
}

func newAsyncScheduler() *asyncScheduler {
as := &asyncScheduler{
tasks: make(chan func(), 1),
busy: make(chan struct{}, 1),
close: make(chan struct{}),
}

go as.run()

return as
}

func (as *asyncScheduler) Close() {
select {
case <-as.close:
return
}

close(as.close)
}

func (as *asyncScheduler) run() {
for {
select {
case task := <-as.tasks:
task()
<-as.busy
case <-as.close:
return
}
}
}

func (as *asyncScheduler) Schedule(task func()) {
if !as.acquireToken() {
return
}

as.scheduleTask(task)
}

func (as *asyncScheduler) scheduleTask(task func()) {
select {
case as.tasks <- task:
case <-as.close:
default:
}
}

func (as *asyncScheduler) acquireToken() bool {
select {
case as.busy <- struct{}{}:
return true
case <-as.close:
return false
default:
return false
}
}

0 comments on commit b6f9417

Please sign in to comment.