diff --git a/README.md b/README.md index 33b7630..6d956ee 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,10 @@ INFO[2017-07-11T12:24:32+02:00] job succeeded it WARN[2017-07-11T12:24:32+02:00] job took too long to run: it should have started 1.014474099s ago job.command="sleep 2" job.position=0 job.schedule="* * * * * * *" ``` +You can optionally disable this behavior and allow overlapping instances of +your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will +still warn about jobs falling behind, but will run duplicate instances of them. + ## Reload crontab diff --git a/cron/cron.go b/cron/cron.go index 6bc3b06..324bc31 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -106,7 +106,7 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry) e return nil } -func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry) { +func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry, overlapping bool) { t := t0 for { @@ -114,63 +114,89 @@ func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time select { case <-time.After(time.Until(t)): - jobLogger.Warnf("not starting: job is still running since %s (%s elapsed)", t0, t.Sub(t0)) + m := "not starting" + if overlapping { + m = "overlapping jobs" + } + + jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0)) case <-ctx.Done(): return } } } -func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry) { +func startFunc(wg *sync.WaitGroup, exitCtx context.Context, logger *logrus.Entry, overlapping bool, expression crontab.Expression, fn func(time.Time, *logrus.Entry)) { wg.Add(1) go func() { defer wg.Done() - var cronIteration uint64 = 0 + var jobWg sync.WaitGroup + defer jobWg.Wait() + + var cronIteration uint64 nextRun := time.Now() - // NOTE: this (intentionally) does not run multiple instances of the - // job concurrently + // NOTE: if overlapping is disabled (default), this does not run multiple + // instances of the job concurrently for { - nextRun = job.Expression.Next(nextRun) - cronLogger.Debugf("job will run next at %v", nextRun) + nextRun = expression.Next(nextRun) + logger.Debugf("job will run next at %v", nextRun) delay := nextRun.Sub(time.Now()) if delay < 0 { - cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay) + logger.Warningf("job took too long to run: it should have started %v ago", -delay) nextRun = time.Now() continue } select { case <-exitCtx.Done(): - cronLogger.Debug("shutting down") + logger.Debug("shutting down") return case <-time.After(delay): // Proceed normally } - jobLogger := cronLogger.WithFields(logrus.Fields{ - "iteration": cronIteration, - }) + jobWg.Add(1) - err := func() error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + runThisJob := func(cronIteration uint64) { + defer jobWg.Done() - go monitorJob(ctx, job.Expression, nextRun, jobLogger) + jobLogger := logger.WithFields(logrus.Fields{ + "iteration": cronIteration, + }) - return runJob(cronCtx, job.Command, jobLogger) - }() + fn(nextRun, jobLogger) + } - if err == nil { - jobLogger.Info("job succeeded") + if overlapping { + go runThisJob(cronIteration) } else { - jobLogger.Error(err) + runThisJob(cronIteration) } cronIteration++ } }() } + +func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool) { + runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) { + monitorCtx, cancelMonitor := context.WithCancel(context.Background()) + defer cancelMonitor() + + go monitorJob(monitorCtx, job.Expression, t0, jobLogger, overlapping) + + err := runJob(cronCtx, job.Command, jobLogger) + + if err == nil { + jobLogger.Info("job succeeded") + } else { + jobLogger.Error(err) + } + } + + startFunc(wg, exitCtx, cronLogger, overlapping, job.Expression, runThisJob) +} diff --git a/cron/cron_test.go b/cron/cron_test.go index 2477c6a..7abaddd 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -3,15 +3,17 @@ package cron import ( "context" "fmt" - "github.com/aptible/supercronic/crontab" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" "io/ioutil" "regexp" "strings" "sync" "testing" "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/aptible/supercronic/crontab" ) var ( @@ -193,7 +195,7 @@ func TestStartJobExitsOnRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - StartJob(&wg, &basicContext, &job, ctx, logger) + StartJob(&wg, &basicContext, &job, ctx, logger, false) wg.Wait() } @@ -213,7 +215,7 @@ func TestStartJobRunsJob(t *testing.T) { logger, channel := newTestLogger() - StartJob(&wg, &basicContext, &job, ctx, logger) + StartJob(&wg, &basicContext, &job, ctx, logger, false) select { case entry := <-channel: @@ -260,3 +262,126 @@ func TestStartJobRunsJob(t *testing.T) { cancel() wg.Wait() } + +func TestStartFuncWaitsForCompletion(t *testing.T) { + // We use startFunc to start a function, wait for it to start, then + // tell the whole thing to exit, and verify that it waits for the + // function to finish. + expr := &testExpression{10 * time.Millisecond} + + var wg sync.WaitGroup + logger, _ := newTestLogger() + + ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background()) + ctxAllDone, allDone := context.WithCancel(context.Background()) + + ctxStep1, step1Done := context.WithCancel(context.Background()) + ctxStep2, step2Done := context.WithCancel(context.Background()) + + testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + step1Done() + <-ctxStep2.Done() + } + + startFunc(&wg, ctxStartFunc, logger, false, expr, testFn) + go func() { + wg.Wait() + allDone() + }() + + select { + case <-ctxStep1.Done(): + case <-time.After(time.Second): + t.Fatalf("timed out waiting for testFn to start") + } + + cancelStartFunc() + + select { + case <-ctxAllDone.Done(): + t.Fatalf("wg completed before jobs finished") + case <-time.After(time.Second): + } + + step2Done() + + select { + case <-ctxAllDone.Done(): + case <-time.After(time.Second): + t.Fatalf("wg did not complete after jobs finished") + } +} + +func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) { + // We kick off a function that does not terminate. We expect to see it + // run only once. + + expr := &testExpression{10 * time.Millisecond} + + testChan := make(chan interface{}, TEST_CHANNEL_BUFFER_SIZE) + + var wg sync.WaitGroup + logger, _ := newTestLogger() + + ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background()) + ctxAllDone, allDone := context.WithCancel(context.Background()) + + testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testChan <- nil + <-ctxAllDone.Done() + } + + startFunc(&wg, ctxStartFunc, logger, false, expr, testFn) + + select { + case <-testChan: + case <-time.After(time.Second): + t.Fatalf("fn did not run") + } + + select { + case <-testChan: + t.Fatalf("fn instances overlapped") + case <-time.After(time.Second): + } + + cancelStartFunc() + allDone() + + wg.Wait() +} + +func TestStartFuncRunsOverlappingJobs(t *testing.T) { + // We kick off a bunch of functions that never terminate, and expect to + // still see multiple iterations + + expr := &testExpression{10 * time.Millisecond} + + testChan := make(chan interface{}, TEST_CHANNEL_BUFFER_SIZE) + + var wg sync.WaitGroup + logger, _ := newTestLogger() + + ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background()) + ctxAllDone, allDone := context.WithCancel(context.Background()) + + testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testChan <- nil + <-ctxAllDone.Done() + } + + startFunc(&wg, ctxStartFunc, logger, true, expr, testFn) + + for i := 0; i < 5; i++ { + select { + case <-testChan: + case <-time.After(time.Second): + t.Fatalf("fn instances did not overlap") + } + } + + cancelStartFunc() + allDone() + + wg.Wait() +} diff --git a/crontab/crontab.go b/crontab/crontab.go index 283a75d..d3a683f 100644 --- a/crontab/crontab.go +++ b/crontab/crontab.go @@ -3,11 +3,12 @@ package crontab import ( "bufio" "fmt" - "github.com/gorhill/cronexpr" - "github.com/sirupsen/logrus" "io" "regexp" "strings" + + "github.com/gorhill/cronexpr" + "github.com/sirupsen/logrus" ) var ( diff --git a/crontab/crontab_test.go b/crontab/crontab_test.go index 73045f8..abca293 100644 --- a/crontab/crontab_test.go +++ b/crontab/crontab_test.go @@ -3,8 +3,9 @@ package crontab import ( "bytes" "fmt" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) var parseCrontabTestCases = []struct { diff --git a/integration/test.bats b/integration/test.bats index 26af397..9abaa53 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -57,6 +57,11 @@ wait_for() { [[ "$n" -eq 2 ]] } +@test "it runs overlapping jobs" { + n="$(SUPERCRONIC_ARGS="-overlapping" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "starting" | wc -l)" + [[ "$n" -ge 4 ]] +} + @test "it supports debug logging " { SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug" } diff --git a/main.go b/main.go index 13142e5..377a146 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ func main() { splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr") sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN") sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn") + overlapping := flag.Bool("overlapping", false, "enable tasks overlapping") flag.Parse() var sentryDsn string @@ -111,7 +112,7 @@ func main() { "job.position": job.Position, }) - cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger) + cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping) } termChan := make(chan os.Signal, 1)