Skip to content

Commit

Permalink
[WIP] Build blocks asynchronously
Browse files Browse the repository at this point in the history
When it is a node's turn to propose a block, it calls the BlockBuilder's BuildBlock method.
However, this method may take a long time to complete, and during this time, the Epoch instance
cannot process messages (since proposing a new block may be called indirectly via HandleMessage).

This commit adds an asynchronous task scheduler which the Epoch uses to register a callback to make itself propose the block
after building it asynchronously.

Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 2, 2025
1 parent bafb383 commit ed8c038
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 6 deletions.
33 changes: 28 additions & 5 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"simplex/record"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -55,6 +56,8 @@ type EpochConfig struct {
type Epoch struct {
EpochConfig
// Runtime
as *asyncScheduler
lock sync.Mutex
lastBlock Block // latest block commited
canReceiveMessages bool
finishCtx context.Context
Expand Down Expand Up @@ -82,6 +85,9 @@ func (e *Epoch) AdvanceTime(t time.Duration) {

// HandleMessage notifies the engine about a reception of a message.
func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
e.lock.Lock()
defer e.lock.Unlock()

// Guard against receiving messages before we are ready to handle them.
if !e.canReceiveMessages {
e.Logger.Warn("Cannot receive a message")
Expand Down Expand Up @@ -117,6 +123,7 @@ func (e *Epoch) init() error {
e.canReceiveMessages = true
}()

e.as = newAsyncScheduler()
e.finishCtx, e.finishFn = context.WithCancel(context.Background())
e.nodes = e.Comm.ListNodes()
e.quorumSize = Quorum(len(e.nodes))
Expand All @@ -142,6 +149,9 @@ func (e *Epoch) init() error {
}

func (e *Epoch) Start() error {
e.lock.Lock()
defer e.lock.Unlock()

return e.syncFromWal()
}

Expand Down Expand Up @@ -763,12 +773,24 @@ func (e *Epoch) locateBlock(seq uint64, digest []byte) (Block, bool) {
return nil, false
}

func (e *Epoch) proposeBlock() error {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
return errors.New("failed to build block")
func (e *Epoch) buildBlock() {
task := func() {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
e.Logger.Warn("Failed building block")
return
}

e.lock.Lock()
defer e.lock.Unlock()

e.proposeBlock(block)
}

e.as.Schedule(task)
}

func (e *Epoch) proposeBlock(block Block) error {
md := block.BlockHeader()

// Write record to WAL before broadcasting it, so that
Expand Down Expand Up @@ -831,7 +853,8 @@ func (e *Epoch) startRound() error {
leaderForCurrentRound := leaderForRound(e.nodes, e.round)

if e.ID.Equals(leaderForCurrentRound) {
return e.proposeBlock()
e.buildBlock()
return nil
}

// If we're not the leader, check if we have received a proposal earlier for this round
Expand Down
1 change: 0 additions & 1 deletion epoch_multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestSimplexMultiNodeSimple(t *testing.T) {
for _, n := range instances {
n.ledger.waitForBlockCommit(uint64(seq))
}
bb.triggerNewBlock()
}
}

Expand Down
66 changes: 66 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,69 @@ func RetrieveLastBlockFromStorage(s Storage) (Block, error) {
}
return lastBlock, nil
}

type asyncScheduler struct {
tasks chan func()
busy chan struct{}
close chan struct{}
}

func newAsyncScheduler() *asyncScheduler {
as := &asyncScheduler{
tasks: make(chan func()),
busy: make(chan struct{}, 1),
close: make(chan struct{}),
}

go as.run()

return as
}

func (as *asyncScheduler) Close() {
select {
case <-as.close:
return
}

close(as.close)
}

func (as *asyncScheduler) run() {
for {
select {
case task := <-as.tasks:
task()
<-as.busy
case <-as.close:
return
}
}
}

func (as *asyncScheduler) Schedule(task func()) {
if !as.acquireToken() {
return
}

as.scheduleTask(task)
}

func (as *asyncScheduler) scheduleTask(task func()) {
select {
case as.tasks <- task:
case <-as.close:
default:
}
}

func (as *asyncScheduler) acquireToken() bool {
select {
case as.busy <- struct{}{}:
return true
case <-as.close:
return false
default:
return false
}
}

0 comments on commit ed8c038

Please sign in to comment.