Skip to content

Commit

Permalink
Merge pull request #18 from pavel-popov/overlapping
Browse files Browse the repository at this point in the history
Add overlapping flag
  • Loading branch information
krallin authored Feb 8, 2019
2 parents 328679f + e7ecf3f commit de0b58f
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 31 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ INFO[2017-07-11T12:24:32+02:00] job succeeded it
WARN[2017-07-11T12:24:32+02:00] job took too long to run: it should have started 1.014474099s ago job.command="sleep 2" job.position=0 job.schedule="* * * * * * *"
```

You can optionally disable this behavior and allow overlapping instances of
your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will
still warn about jobs falling behind, but will run duplicate instances of them.


## Reload crontab

Expand Down
70 changes: 48 additions & 22 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,71 +106,97 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry) e
return nil
}

func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry) {
func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry, overlapping bool) {
t := t0

for {
t = expression.Next(t)

select {
case <-time.After(time.Until(t)):
jobLogger.Warnf("not starting: job is still running since %s (%s elapsed)", t0, t.Sub(t0))
m := "not starting"
if overlapping {
m = "overlapping jobs"
}

jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0))
case <-ctx.Done():
return
}
}
}

func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry) {
func startFunc(wg *sync.WaitGroup, exitCtx context.Context, logger *logrus.Entry, overlapping bool, expression crontab.Expression, fn func(time.Time, *logrus.Entry)) {
wg.Add(1)

go func() {
defer wg.Done()

var cronIteration uint64 = 0
var jobWg sync.WaitGroup
defer jobWg.Wait()

var cronIteration uint64
nextRun := time.Now()

// NOTE: this (intentionally) does not run multiple instances of the
// job concurrently
// NOTE: if overlapping is disabled (default), this does not run multiple
// instances of the job concurrently
for {
nextRun = job.Expression.Next(nextRun)
cronLogger.Debugf("job will run next at %v", nextRun)
nextRun = expression.Next(nextRun)
logger.Debugf("job will run next at %v", nextRun)

delay := nextRun.Sub(time.Now())
if delay < 0 {
cronLogger.Warningf("job took too long to run: it should have started %v ago", -delay)
logger.Warningf("job took too long to run: it should have started %v ago", -delay)
nextRun = time.Now()
continue
}

select {
case <-exitCtx.Done():
cronLogger.Debug("shutting down")
logger.Debug("shutting down")
return
case <-time.After(delay):
// Proceed normally
}

jobLogger := cronLogger.WithFields(logrus.Fields{
"iteration": cronIteration,
})
jobWg.Add(1)

err := func() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runThisJob := func(cronIteration uint64) {
defer jobWg.Done()

go monitorJob(ctx, job.Expression, nextRun, jobLogger)
jobLogger := logger.WithFields(logrus.Fields{
"iteration": cronIteration,
})

return runJob(cronCtx, job.Command, jobLogger)
}()
fn(nextRun, jobLogger)
}

if err == nil {
jobLogger.Info("job succeeded")
if overlapping {
go runThisJob(cronIteration)
} else {
jobLogger.Error(err)
runThisJob(cronIteration)
}

cronIteration++
}
}()
}

func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) {
monitorCtx, cancelMonitor := context.WithCancel(context.Background())
defer cancelMonitor()

go monitorJob(monitorCtx, job.Expression, t0, jobLogger, overlapping)

err := runJob(cronCtx, job.Command, jobLogger)

if err == nil {
jobLogger.Info("job succeeded")
} else {
jobLogger.Error(err)
}
}

startFunc(wg, exitCtx, cronLogger, overlapping, job.Expression, runThisJob)
}
135 changes: 130 additions & 5 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package cron
import (
"context"
"fmt"
"github.com/aptible/supercronic/crontab"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"io/ioutil"
"regexp"
"strings"
"sync"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"

"github.com/aptible/supercronic/crontab"
)

var (
Expand Down Expand Up @@ -193,7 +195,7 @@ func TestStartJobExitsOnRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

StartJob(&wg, &basicContext, &job, ctx, logger)
StartJob(&wg, &basicContext, &job, ctx, logger, false)

wg.Wait()
}
Expand All @@ -213,7 +215,7 @@ func TestStartJobRunsJob(t *testing.T) {

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger)
StartJob(&wg, &basicContext, &job, ctx, logger, false)

select {
case entry := <-channel:
Expand Down Expand Up @@ -260,3 +262,126 @@ func TestStartJobRunsJob(t *testing.T) {
cancel()
wg.Wait()
}

func TestStartFuncWaitsForCompletion(t *testing.T) {
// We use startFunc to start a function, wait for it to start, then
// tell the whole thing to exit, and verify that it waits for the
// function to finish.
expr := &testExpression{10 * time.Millisecond}

var wg sync.WaitGroup
logger, _ := newTestLogger()

ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

ctxStep1, step1Done := context.WithCancel(context.Background())
ctxStep2, step2Done := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
step1Done()
<-ctxStep2.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, testFn)
go func() {
wg.Wait()
allDone()
}()

select {
case <-ctxStep1.Done():
case <-time.After(time.Second):
t.Fatalf("timed out waiting for testFn to start")
}

cancelStartFunc()

select {
case <-ctxAllDone.Done():
t.Fatalf("wg completed before jobs finished")
case <-time.After(time.Second):
}

step2Done()

select {
case <-ctxAllDone.Done():
case <-time.After(time.Second):
t.Fatalf("wg did not complete after jobs finished")
}
}

func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) {
// We kick off a function that does not terminate. We expect to see it
// run only once.

expr := &testExpression{10 * time.Millisecond}

testChan := make(chan interface{}, TEST_CHANNEL_BUFFER_SIZE)

var wg sync.WaitGroup
logger, _ := newTestLogger()

ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, testFn)

select {
case <-testChan:
case <-time.After(time.Second):
t.Fatalf("fn did not run")
}

select {
case <-testChan:
t.Fatalf("fn instances overlapped")
case <-time.After(time.Second):
}

cancelStartFunc()
allDone()

wg.Wait()
}

func TestStartFuncRunsOverlappingJobs(t *testing.T) {
// We kick off a bunch of functions that never terminate, and expect to
// still see multiple iterations

expr := &testExpression{10 * time.Millisecond}

testChan := make(chan interface{}, TEST_CHANNEL_BUFFER_SIZE)

var wg sync.WaitGroup
logger, _ := newTestLogger()

ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, true, expr, testFn)

for i := 0; i < 5; i++ {
select {
case <-testChan:
case <-time.After(time.Second):
t.Fatalf("fn instances did not overlap")
}
}

cancelStartFunc()
allDone()

wg.Wait()
}
5 changes: 3 additions & 2 deletions crontab/crontab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package crontab
import (
"bufio"
"fmt"
"github.com/gorhill/cronexpr"
"github.com/sirupsen/logrus"
"io"
"regexp"
"strings"

"github.com/gorhill/cronexpr"
"github.com/sirupsen/logrus"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion crontab/crontab_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package crontab
import (
"bytes"
"fmt"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

var parseCrontabTestCases = []struct {
Expand Down
5 changes: 5 additions & 0 deletions integration/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ wait_for() {
[[ "$n" -eq 2 ]]
}

@test "it runs overlapping jobs" {
n="$(SUPERCRONIC_ARGS="-overlapping" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "starting" | wc -l)"
[[ "$n" -ge 4 ]]
}

@test "it supports debug logging " {
SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug"
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func main() {
splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr")
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
flag.Parse()

var sentryDsn string
Expand Down Expand Up @@ -111,7 +112,7 @@ func main() {
"job.position": job.Position,
})

cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger)
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping)
}

termChan := make(chan os.Signal, 1)
Expand Down

0 comments on commit de0b58f

Please sign in to comment.