Skip to content

Commit

Permalink
Merge pull request #51 from sevagh/master
Browse files Browse the repository at this point in the history
Add Prometheus instrumentation
  • Loading branch information
krallin authored Jun 7, 2019
2 parents 51785e0 + ba71588 commit 2275476
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 15 deletions.
69 changes: 69 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@
[[constraint]]
name = "github.com/stretchr/testify"
version = "~1.1.4"

[[constraint]]
branch = "master"
name = "github.com/prometheus/client_golang"
43 changes: 37 additions & 6 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"bufio"
"context"
"fmt"
"github.com/aptible/supercronic/crontab"
"github.com/sirupsen/logrus"
"io"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"

"github.com/aptible/supercronic/crontab"
"github.com/aptible/supercronic/prometheus_metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

var (
Expand Down Expand Up @@ -106,11 +109,11 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry) e
return nil
}

func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry, overlapping bool) {
func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
t := t0

for {
t = expression.Next(t)
t = job.Expression.Next(t)

select {
case <-time.After(time.Until(t)):
Expand All @@ -120,6 +123,8 @@ func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time
}

jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0))

promMetrics.CronsDeadlineExceededCounter.With(jobPromLabels(job)).Inc()
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -182,21 +187,47 @@ func startFunc(wg *sync.WaitGroup, exitCtx context.Context, logger *logrus.Entry
}()
}

func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool) {
func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) {
promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc()

defer func() {
promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Dec()
}()

monitorCtx, cancelMonitor := context.WithCancel(context.Background())
defer cancelMonitor()

go monitorJob(monitorCtx, job.Expression, t0, jobLogger, overlapping)
go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics)

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v)
}))

defer timer.ObserveDuration()

err := runJob(cronCtx, job.Command, jobLogger)

promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc()

if err == nil {
jobLogger.Info("job succeeded")

promMetrics.CronsSuccessCounter.With(jobPromLabels(job)).Inc()
} else {
jobLogger.Error(err)

promMetrics.CronsFailCounter.With(jobPromLabels(job)).Inc()
}
}

startFunc(wg, exitCtx, cronLogger, overlapping, job.Expression, runThisJob)
}

func jobPromLabels(job *crontab.Job) prometheus.Labels {
return prometheus.Labels{
"position": fmt.Sprintf("%d", job.Position),
"command": job.Command,
"schedule": job.Schedule,
}
}
6 changes: 4 additions & 2 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/stretchr/testify/assert"

"github.com/aptible/supercronic/crontab"
"github.com/aptible/supercronic/prometheus_metrics"
)

var (
TEST_CHANNEL_BUFFER_SIZE = 100
PROM_METRICS = prometheus_metrics.NewPrometheusMetrics()
)

type testHook struct {
Expand Down Expand Up @@ -195,7 +197,7 @@ func TestStartJobExitsOnRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

StartJob(&wg, &basicContext, &job, ctx, logger, false)
StartJob(&wg, &basicContext, &job, ctx, logger, false, &PROM_METRICS)

wg.Wait()
}
Expand All @@ -215,7 +217,7 @@ func TestStartJobRunsJob(t *testing.T) {

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger, false)
StartJob(&wg, &basicContext, &job, ctx, logger, false, &PROM_METRICS)

select {
case entry := <-channel:
Expand Down
33 changes: 26 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"
"flag"
"fmt"
"github.com/aptible/supercronic/cron"
"github.com/aptible/supercronic/crontab"
"github.com/aptible/supercronic/log/hook"
"github.com/evalphobia/logrus_sentry"
"github.com/sirupsen/logrus"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/aptible/supercronic/cron"
"github.com/aptible/supercronic/crontab"
"github.com/aptible/supercronic/log/hook"
"github.com/aptible/supercronic/prometheus_metrics"
"github.com/evalphobia/logrus_sentry"
"github.com/sirupsen/logrus"
)

var Usage = func() {
Expand All @@ -25,6 +27,7 @@ func main() {
debug := flag.Bool("debug", false, "enable debug logging")
json := flag.Bool("json", false, "enable JSON logging")
test := flag.Bool("test", false, "test crontab (does not run jobs)")
prometheusListen := flag.String("prometheus-listen-address", "", "give a valid ip:port address to expose Prometheus metrics at /metrics")
splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr")
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
Expand All @@ -50,7 +53,6 @@ func main() {
} else {
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})
}

if *splitLogs {
hook.RegisterSplitLogger(
logrus.StandardLogger(),
Expand Down Expand Up @@ -87,7 +89,24 @@ func main() {
}
}

promMetrics := prometheus_metrics.NewPrometheusMetrics()

if *prometheusListen != "" {
promServerShutdownClosure, err := prometheus_metrics.InitHTTPServer(*prometheusListen, context.Background())
if err != nil {
logrus.Fatalf("prometheus http startup failed: %s", err.Error())
}

defer func() {
if err := promServerShutdownClosure(); err != nil {
logrus.Fatalf("prometheus http shutdown failed: %s", err.Error())
}
}()
}

for true {
promMetrics.Reset()

logrus.Infof("read crontab: %s", crontabFileName)
tab, err := readCrontabAtPath(crontabFileName)

Expand All @@ -112,7 +131,7 @@ func main() {
"job.position": job.Position,
})

cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping)
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, &promMetrics)
}

termChan := make(chan os.Signal, 1)
Expand Down
Loading

0 comments on commit 2275476

Please sign in to comment.