Skip to content

Commit

Permalink
Improve concurrency and reason about code correctness
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 16, 2025
1 parent 675517d commit edb1192
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
49 changes: 46 additions & 3 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (as *scheduler) run() {
if as.close {
return
}
if len(as.ready) > 0 {
continue
}
as.signal.Wait()
}
}
Expand All @@ -60,10 +63,52 @@ func (as *scheduler) maybeExecuteTask() {
task, as.ready = as.ready[0], as.ready[1:]
as.lock.Unlock()
task.f()
as.lock.Lock()
}
}

/*
(a) While a task is scheduled, the lock is held and therefore the only instructions that the scheduler thread
can perform is running 'dispatchTaskAndScheduleDependingTasks', as any other line in 'maybeExecuteTask' requires
the lock to be held. Inside 'dispatchTaskAndScheduleDependingTasks', the only line that doesn't require the lock
to be held is executing the task itself. It follows from here, that it is not possible to schedule a new task
while moving tasks from pending to the ready queue, and vice versa.
(b) The Epoch schedules new tasks under a lock, and computes whether a task is ready or not, under that lock as well.
Since each task in the Epoch first obtains a lock before proceeding to do anything, if a task A finished its execution before
the task B that depends on it was scheduled, then it cannot be that B was scheduled with a dependency on A and is not ready,
because the computation of whether B is ready to be scheduled or not is mutually exclusive with respect to A's execution.
Therefore, if A finished executing it must be that B is ready to be executed.
(c) If a task is scheduled and is ready to run, it will be executed after a finite set of instructions.
The reason is that a ready task is entered into the ready queue and then the condition variable is signaled.
The scheduler goroutine can be either waiting for the signal, in which case it will wake up and execute the task,
or it can be performing an instruction before waiting for the signal. In the latter case, the only time when
a lock is not held, is when the task is executed. Afterward, the lock is re-acquired in 'dispatchTaskAndScheduleDependingTasks'.
It follows from (a) that if the lock is not held by the scheduler goroutine, then it will check for ready tasks one more time
just before entering the wait for the signal, and therefore even if the signal is given while the scheduler goroutine is not waiting
for it, a scheduling of a task ready to run will run after a finite set of instructions by the scheduler goroutine.
Assume in contradiction that there exists a task B such that it is scheduled and is not ready,
depends on a task A which finishes, but B is never scheduled once A finishes.
We split into two distinct cases:
1) B is scheduled after A
2) A is scheduled after B
If (1) holds, then when B is scheduled, it is not ready and hence it is inserted into pending.
It follows from (b) that A does not finish before B is inserted into pending.
At some point the task A finishes its execution, after which the scheduler goroutine
enters 'dispatchTaskAndScheduleDependingTasks' where it proceeds to remove the ID of A,
retrieve B from pending, add B to the ready queue, and perform another iteration inside 'maybeExecuteTask'.
It will then pop tasks from the ready queue and execute them until it is empty, and one of these tasks will be B.
If (2) holds, then when B is scheduled it is pending on A to finish.
The rest follows trivially from (1).
*/

func (as *scheduler) Schedule(id Digest, f func(), prev Digest, ready bool) {
as.lock.Lock()
defer as.lock.Unlock()
Expand Down Expand Up @@ -91,10 +136,8 @@ func (as *scheduler) dispatchTaskAndScheduleDependingTasks(id Digest, task func(
return func() {
task()
as.lock.Lock()
defer as.lock.Unlock()
newlyReadyTasks := as.pending.Remove(id)
as.ready = append(as.ready, newlyReadyTasks...)
as.signal.Signal()
}
}

Expand Down
39 changes: 27 additions & 12 deletions sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package simplex

import (
"crypto/rand"
rand2 "math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestAsyncScheduler(t *testing.T) {
ticks <- struct{}{}
})

t.Run("Executes several pending tasks concurrently", func(t *testing.T) {
t.Run("Executes several pending tasks concurrently in arbitrary order", func(t *testing.T) {
as := NewScheduler()
defer as.Close()

Expand All @@ -90,32 +92,45 @@ func TestAsyncScheduler(t *testing.T) {
wg.Add(n)

var prevTask Digest
tasks := make([]func(), n)

for i := 0; i < n; i++ {
taskID := makeDigest(t)
scheduleTask(&lock, finished, prevTask, taskID, &wg, as, i)
tasks[i] = scheduleTask(&lock, finished, prevTask, taskID, &wg, as, i)
// Next iteration's previous task ID is current task ID
prevTask = taskID
}

seed := time.Now().UnixNano()
r := rand2.New(rand2.NewSource(seed))

for _, index := range r.Perm(n) {
tasks[index]()
}

wg.Wait()
})
}

func scheduleTask(lock *sync.Mutex, finished map[Digest]struct{}, dependency Digest, id Digest, wg *sync.WaitGroup, as *scheduler, i int) {
lock.Lock()
defer lock.Unlock()
func scheduleTask(lock *sync.Mutex, finished map[Digest]struct{}, dependency Digest, id Digest, wg *sync.WaitGroup, as *scheduler, i int) func() {
var dep Digest
copy(dep[:], dependency[:])

_, hasFinished := finished[dependency]

task := func() {
return func() {
lock.Lock()
defer lock.Unlock()
finished[id] = struct{}{}
wg.Done()
}

as.Schedule(id, task, dependency, i == 0 || hasFinished)
_, hasFinished := finished[dep]

task := func() {
lock.Lock()
defer lock.Unlock()
finished[id] = struct{}{}
wg.Done()
}

as.Schedule(id, task, dep, i == 0 || hasFinished)
}
}

func makeDigest(t *testing.T) Digest {
Expand Down

0 comments on commit edb1192

Please sign in to comment.