Skip to content

Commit

Permalink
Fix when->timeout canceling running jobs (#458)
Browse files Browse the repository at this point in the history
* split Job.Subscribe from Job.Run
* cancel start timeout event on job start
  • Loading branch information
tgross authored Aug 2, 2017
1 parent 5a20f21 commit 61e7713
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 17 deletions.
9 changes: 8 additions & 1 deletion core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,15 @@ func (a *App) reload() error {
// HandlePolling sets up polling functions and write their quit channels
// back to our config
func (a *App) handlePolling() {

// we need to subscribe to events before we Run all the jobs
// to avoid races where a job finishes and fires events before
// other jobs are even subscribed to listen for them.
for _, job := range a.Jobs {
job.Subscribe(a.Bus)
}
for _, job := range a.Jobs {
job.Run(a.Bus)
job.Run()
}
for _, watch := range a.Watches {
watch.Run(a.Bus)
Expand Down
3 changes: 2 additions & 1 deletion core/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func getSignalTestConfig(t *testing.T) *App {
func TestTerminateSignal(t *testing.T) {
app := getSignalTestConfig(t)
bus := app.Bus
app.Jobs[0].Run(bus)
app.Jobs[0].Subscribe(bus)
app.Jobs[0].Run()

app.Terminate()
bus.Wait()
Expand Down
1 change: 1 addition & 0 deletions events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type EventHandler struct {
func (evh *EventHandler) Subscribe(bus *EventBus, isInternal ...bool) {
evh.wg.Add(1)
bus.Register(evh, isInternal...)
evh.Bus = bus
}

// Unsubscribe removes the EventHandler from the list of handlers
Expand Down
22 changes: 12 additions & 10 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ type Job struct {
healthCheckName string

// starting events
startEvent events.Event
startTimeout time.Duration
startsRemain int
startEvent events.Event
startTimeout time.Duration
startsRemain int
startTimeoutEvent events.Event

// stopping events
stoppingWaitEvent events.Event
Expand Down Expand Up @@ -162,6 +163,7 @@ func (job *Job) HealthCheck(ctx context.Context) {

// StartJob runs the Job's executable
func (job *Job) StartJob(ctx context.Context) {
job.startTimeoutEvent = events.NonEvent
job.setStatus(statusUnknown)
if job.exec != nil {
job.exec.Run(ctx, job.Bus)
Expand All @@ -176,9 +178,7 @@ func (job *Job) Kill() {
}

// Run executes the event loop for the Job
func (job *Job) Run(bus *events.EventBus) {
job.Subscribe(bus)
job.Bus = bus
func (job *Job) Run() {
ctx, cancel := context.WithCancel(context.Background())

if job.frequency > 0 {
Expand All @@ -190,8 +190,11 @@ func (job *Job) Run(bus *events.EventBus) {
fmt.Sprintf("%s.heartbeat", job.Name))
}
if job.startTimeout > 0 {
events.NewEventTimeout(ctx, job.Rx, job.startTimeout,
fmt.Sprintf("%s.wait-timeout", job.Name))
timeoutName := fmt.Sprintf("%s.wait-timeout", job.Name)
events.NewEventTimeout(ctx, job.Rx, job.startTimeout, timeoutName)
job.startTimeoutEvent = events.Event{events.TimerExpired, timeoutName}
} else {
job.startTimeoutEvent = events.NonEvent
}

go func() {
Expand All @@ -215,7 +218,6 @@ func (job *Job) Run(bus *events.EventBus) {
func (job *Job) processEvent(ctx context.Context, event events.Event) bool {
runEverySource := fmt.Sprintf("%s.run-every", job.Name)
heartbeatSource := fmt.Sprintf("%s.heartbeat", job.Name)
startTimeoutSource := fmt.Sprintf("%s.wait-timeout", job.Name)
var healthCheckName string
if job.healthCheckExec != nil {
healthCheckName = job.healthCheckExec.Name
Expand All @@ -233,7 +235,7 @@ func (job *Job) processEvent(ctx context.Context, event events.Event) bool {
job.SendHeartbeat()
}
}
case events.Event{events.TimerExpired, startTimeoutSource}:
case job.startTimeoutEvent:
job.Bus.Publish(events.Event{
Code: events.TimerExpired, Source: job.Name})
job.Rx <- events.Event{Code: events.Quit, Source: job.Name}
Expand Down
52 changes: 47 additions & 5 deletions jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func TestJobRunSafeClose(t *testing.T) {
cfg := &Config{Name: "myjob", Exec: "sleep 10"} // don't want exec to finish
cfg.Validate(noop)
job := NewJob(cfg)
job.Run(bus)
job.Subscribe(bus)
job.Run()
bus.Publish(events.GlobalStartup)
job.Quit()
bus.Wait()
Expand Down Expand Up @@ -46,7 +47,8 @@ func TestJobRunStartupTimeout(t *testing.T) {
When: &WhenConfig{Source: "never", Once: "startup", Timeout: "100ms"}}
cfg.Validate(noop)
job := NewJob(cfg)
job.Run(bus)
job.Subscribe(bus)
job.Run()
job.Bus.Publish(events.GlobalStartup)

time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -75,6 +77,43 @@ func TestJobRunStartupTimeout(t *testing.T) {
}
}

// A Job should not timeout if started before the startupTimeout
func TestJobRunStartupNoTimeout(t *testing.T) {
bus := events.NewEventBus()
cfg := &Config{Name: "myjob", Exec: "sleep 5",
When: &WhenConfig{Timeout: "500ms"}}
cfg.Validate(noop)
cfg.whenEvent = events.GlobalStartup

job := NewJob(cfg)
job.Subscribe(bus)
job.Run()
job.Bus.Publish(events.GlobalStartup)

time.Sleep(1000 * time.Millisecond)
defer func() {
if r := recover(); r != nil {
t.Fatalf("panicked but should not: sent to closed Subscriber")
}
}()
bus.Publish(events.QuitByClose)
bus.Wait()
results := bus.DebugEvents()

got := map[events.Event]int{}
for _, result := range results {
got[result]++
}
if !reflect.DeepEqual(got, map[events.Event]int{
events.GlobalStartup: 1,
{Code: events.Stopping, Source: "myjob"}: 1,
{Code: events.Stopped, Source: "myjob"}: 1,
events.QuitByClose: 1,
}) {
t.Fatalf("expected timeout after startup but got:\n%v", results)
}
}

func TestJobRunRestarts(t *testing.T) {
runRestartsTest := func(restarts interface{}, expected int) {
bus := events.NewEventBus()
Expand All @@ -88,7 +127,8 @@ func TestJobRunRestarts(t *testing.T) {
cfg.Validate(noop)
job := NewJob(cfg)

job.Run(bus)
job.Subscribe(bus)
job.Run()
job.Bus.Publish(events.GlobalStartup)
time.Sleep(100 * time.Millisecond) // TODO: we can't force this, right?
exitOk := events.Event{Code: events.ExitSuccess, Source: "myjob"}
Expand Down Expand Up @@ -125,7 +165,8 @@ func TestJobRunPeriodic(t *testing.T) {
}
cfg.Validate(noop)
job := NewJob(cfg)
job.Run(bus)
job.Subscribe(bus)
job.Run()
job.Bus.Publish(events.GlobalStartup)
exitOk := events.Event{Code: events.ExitSuccess, Source: "myjob"}
exitFail := events.Event{Code: events.ExitFailed, Source: "myjob"}
Expand Down Expand Up @@ -159,7 +200,8 @@ func TestJobMaintenance(t *testing.T) {
cfg.Validate(noop)
job := NewJob(cfg)
job.setStatus(startingState)
job.Run(bus)
job.Subscribe(bus)
job.Run()
job.Bus.Publish(event)
job.Quit()
return job.GetStatus()
Expand Down

0 comments on commit 61e7713

Please sign in to comment.