Skip to content

Commit

Permalink
fix unsafe map usage in singletonMode (#665)
Browse files Browse the repository at this point in the history
* fix singletonMode unsafe map

* update lint issues

---------

Co-authored-by: a3sroot <[email protected]>
  • Loading branch information
JohnRoesler and a3sroot authored Jan 30, 2024
1 parent d763684 commit 567cb96
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
11 changes: 7 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type executor struct {
jobOutRequest chan jobOutRequest
stopTimeout time.Duration
done chan error
singletonRunners map[uuid.UUID]singletonRunner
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
Expand Down Expand Up @@ -67,7 +67,7 @@ func (e *executor) start() {
limitModeJobsWg := &waitGroupWithMutex{}

// create a fresh map for tracking singleton runners
e.singletonRunners = make(map[uuid.UUID]singletonRunner)
e.singletonRunners = &sync.Map{}

// start the for leap that is the executor
// selecting on channels for work to do
Expand Down Expand Up @@ -151,15 +151,18 @@ func (e *executor) start() {
if j.singletonMode {
// for singleton mode, get the existing runner for the job
// or spin up a new one
runner, ok := e.singletonRunners[jIn.id]
runner := &singletonRunner{}
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
if !ok {
runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners[jIn.id] = runner
e.singletonRunners.Store(jIn.id, runner)
singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
} else {
runner = runnerSrc.(*singletonRunner)
}

if j.singletonLimitMode == LimitModeReschedule {
Expand Down
2 changes: 1 addition & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
exec := executor{
stopCh: make(chan struct{}),
stopTimeout: time.Second * 10,
singletonRunners: make(map[uuid.UUID]singletonRunner),
singletonRunners: nil,
logger: &noOpLogger{},

jobsIn: make(chan jobIn),
Expand Down

0 comments on commit 567cb96

Please sign in to comment.