Skip to content

Commit

Permalink
Clean-up a few things around global/job shutdown
Browse files Browse the repository at this point in the history
This commit makes a few small tweaks around how shutdown occurs. I've added a
shared lock when setting a job as completed and also swapped the complete
channel to send a struct instead of a boolean (not much of a change).
  • Loading branch information
Justin Reagor committed Dec 7, 2017
1 parent 87bcd66 commit 4cc285b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
5 changes: 2 additions & 3 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *App) Run() {
// signal handler above, and reload endpoint, only need to fire a
// GlobalShutdown across the event bus. Context handles everything after
// that process.
completedCh := make(chan bool)
completedCh := make(chan struct{}, 1)
go func() {
for {
select {
Expand Down Expand Up @@ -159,7 +159,6 @@ func (a *App) Run() {
log.Error(err)
break
}
cancel()
close(completedCh)
}
}
Expand Down Expand Up @@ -198,7 +197,7 @@ func (a *App) reload() error {

// HandlePolling sets up polling functions and write their quit channels
// back to our config
func (a *App) runTasks(ctx context.Context, completedCh chan bool) {
func (a *App) runTasks(ctx context.Context, completedCh chan struct{}) {
// we need to subscribe to events before we Run all the jobs
// to avoid races where a job finishes and fires events before
// other jobs are even subscribed to listen for them.
Expand Down
4 changes: 2 additions & 2 deletions core/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func getSignalEventTestConfig(signals []string) *App {
// by this same test, but that we don't have a separate unit test
// because they'll interfere with each other's state.
func TestTerminateSignal(t *testing.T) {
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
app := getSignalTestConfig()
bus := app.Bus
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestTerminateSignal(t *testing.T) {
// Test handler for handling signal events SIGHUP (and SIGUSR2). Note that the
// SIGUSR1 is currently setup to handle reloading ContainerPilot's log file.
func TestSignalEvent(t *testing.T) {
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
signals := []string{"SIGHUP", "SIGUSR2"}
app := getSignalEventTestConfig(signals)
bus := app.Bus
Expand Down
10 changes: 7 additions & 3 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Job struct {
frequency time.Duration

// completed
IsComplete bool
IsComplete bool
completeLock *sync.RWMutex

events.Subscriber
events.Publisher
Expand All @@ -76,6 +77,7 @@ func NewJob(cfg *Config) *Job {
frequency: cfg.freqInterval,
}
job.statusLock = &sync.RWMutex{}
job.completeLock = &sync.RWMutex{}
job.Rx = make(chan events.Event, eventBufferSize)
if job.Name == "containerpilot" {
// right now this hardcodes the telemetry service to
Expand Down Expand Up @@ -119,6 +121,8 @@ func (job *Job) setStatus(status JobStatus) {
}

func (job *Job) setComplete() {
job.completeLock.Lock()
defer job.completeLock.Unlock()
job.IsComplete = true
}

Expand All @@ -130,7 +134,7 @@ func (job *Job) Kill() {
}

// Run executes the event loop for the Job
func (job *Job) Run(pctx context.Context, completedCh chan bool) {
func (job *Job) Run(pctx context.Context, completedCh chan struct{}) {
ctx, cancel := context.WithCancel(pctx)

if job.frequency > 0 {
Expand All @@ -152,7 +156,7 @@ func (job *Job) Run(pctx context.Context, completedCh chan bool) {
go func() {
defer func() {
job.cleanup(ctx, cancel)
completedCh <- true
completedCh <- struct{}{}
}()
for {
select {
Expand Down
12 changes: 6 additions & 6 deletions jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestJobRunSafeClose(t *testing.T) {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{
Name: "myjob",
Exec: "sleep 10",
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestJobRunSafeClose(t *testing.T) {
// A Job should timeout if not started before the startupTimeout
func TestJobRunStartupTimeout(t *testing.T) {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{Name: "myjob", Exec: "true",
When: &WhenConfig{Source: "never", Once: "startup", Timeout: "100ms"}}
cfg.Validate(noop)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestJobRunStartupTimeout(t *testing.T) {
// A Job should not timeout if started before the startupTimeout
func TestJobRunStartupNoTimeout(t *testing.T) {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{Name: "myjob", Exec: "sleep 5",
When: &WhenConfig{Timeout: "500ms"}}
cfg.Validate(noop)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestJobRunStartupNoTimeout(t *testing.T) {
func TestJobRunRestarts(t *testing.T) {
runRestartsTest := func(restarts interface{}, expected int) {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{
Name: "myjob",
whenEvent: events.GlobalStartup,
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestJobRunRestarts(t *testing.T) {

func TestJobRunPeriodic(t *testing.T) {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{
Name: "myjob",
Exec: []string{"./testdata/test.sh", "doStuff", "runPeriodicTest"},
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestJobRunPeriodic(t *testing.T) {
func TestJobMaintenance(t *testing.T) {
testFunc := func(t *testing.T, startingState JobStatus, event events.Event) JobStatus {
bus := events.NewEventBus()
stopCh := make(chan bool)
stopCh := make(chan struct{}, 1)
cfg := &Config{
Name: "myjob",
Exec: "true",
Expand Down

0 comments on commit 4cc285b

Please sign in to comment.