Skip to content

Commit

Permalink
feat: separate port for hook metrics (#190)
Browse files Browse the repository at this point in the history
* feat: separate port for hook metrics

- hook-metrics-listen-port flag to specify a listen port for hook metrics
- fix labels cardinality for kube_event_manager metrics
  • Loading branch information
diafour authored Jul 8, 2020
1 parent 26c78c9 commit 126d478
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 17 deletions.
6 changes: 6 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var TempDir = "/tmp/shell-operator"

var ListenAddress = "0.0.0.0"
var ListenPort = "9115"
var HookMetricsListenPort = ""

var PrometheusMetricsPrefix = "shell_operator_"

Expand Down Expand Up @@ -45,6 +46,11 @@ func DefineStartCommandFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause)
Default(PrometheusMetricsPrefix).
StringVar(&PrometheusMetricsPrefix)

cmd.Flag("hook-metrics-listen-port", "Port to use to serve hooks’ custom metrics to Prometheus. Can be set with $SHELL_OPERATOR_HOOK_METRICS_LISTEN_PORT. Equal to listen-port if empty.").
Envar("SHELL_OPERATOR_HOOK_METRICS_LISTEN_PORT").
Default(HookMetricsListenPort).
StringVar(&HookMetricsListenPort)

DefineKubeClientFlags(cmd)
DefineJqFlags(cmd)
DefineLoggingFlags(cmd)
Expand Down
1 change: 1 addition & 0 deletions pkg/hook/hook_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (hm *hookManager) loadHook(hookPath string) (hook *Hook, err error) {
kubeCfg.Monitor.Metadata.MetricLabels = map[string]string{
"hook": hook.Name,
"binding": kubeCfg.BindingName,
"queue": kubeCfg.Queue,
}
}

Expand Down
35 changes: 31 additions & 4 deletions pkg/metric_storage/metric_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package metric_storage
import (
"context"
"fmt"
"net/http"
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"

"github.com/flant/shell-operator/pkg/metric_storage/operation"
Expand Down Expand Up @@ -38,16 +40,24 @@ type MetricStorage struct {
histogramsLock sync.RWMutex

GroupedVault *vault.GroupedVault

Registry *prometheus.Registry
Gatherer prometheus.Gatherer
Registerer prometheus.Registerer
}

func NewMetricStorage() *MetricStorage {
return &MetricStorage{
m := &MetricStorage{
Gauges: make(map[string]*prometheus.GaugeVec),
Counters: make(map[string]*prometheus.CounterVec),
Histograms: make(map[string]*prometheus.HistogramVec),
HistogramBuckets: make(map[string][]float64),
GroupedVault: vault.NewGroupedVault(),
Gatherer: prometheus.DefaultGatherer,
Registerer: prometheus.DefaultRegisterer,
}
m.GroupedVault.Registerer = m.Registerer
return m
}

func (m *MetricStorage) WithContext(ctx context.Context) {
Expand All @@ -58,6 +68,13 @@ func (m *MetricStorage) WithPrefix(prefix string) {
m.Prefix = prefix
}

func (m *MetricStorage) WithNewRegistry() {
m.Registry = prometheus.NewRegistry()
m.Gatherer = m.Registry
m.Registerer = m.Registry
m.GroupedVault.Registerer = m.Registry
}

func (m *MetricStorage) Stop() {
if m.cancel != nil {
m.cancel()
Expand Down Expand Up @@ -148,7 +165,7 @@ func (m *MetricStorage) RegisterGauge(metric string, labels map[string]string) *
},
LabelNames(labels),
)
prometheus.MustRegister(vec)
m.Registerer.MustRegister(vec)
m.Gauges[metric] = vec
return vec
}
Expand Down Expand Up @@ -209,7 +226,7 @@ func (m *MetricStorage) RegisterCounter(metric string, labels map[string]string)
},
LabelNames(labels),
)
prometheus.MustRegister(vec)
m.Registerer.MustRegister(vec)
m.Counters[metric] = vec
return vec
}
Expand Down Expand Up @@ -277,7 +294,7 @@ func (m *MetricStorage) RegisterHistogram(metric string, labels map[string]strin
Buckets: buckets,
}, LabelNames(labels))

prometheus.MustRegister(vec)
m.Registerer.MustRegister(vec)
m.Histograms[metric] = vec
return vec
}
Expand Down Expand Up @@ -382,3 +399,13 @@ func (m *MetricStorage) ApplyGroupOperations(group string, ops []operation.Metri
}
}
}

func (m *MetricStorage) Handler() http.Handler {
if m.Registry == nil {
return promhttp.Handler()
} else {
return promhttp.HandlerFor(m.Registry, promhttp.HandlerOpts{
Registry: m.Registry,
})
}
}
5 changes: 3 additions & 2 deletions pkg/metric_storage/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type GroupedVault struct {
collectors map[string]ConstMetricCollector
mtx sync.Mutex
Registerer prometheus.Registerer
}

func NewGroupedVault() *GroupedVault {
Expand All @@ -38,7 +39,7 @@ func (v *GroupedVault) GetOrCreateCounterCollector(name string, labelNames []str
collector, ok := v.collectors[name]
if !ok {
collector = NewConstCounterCollector(name, labelNames)
if err := prometheus.Register(collector); err != nil {
if err := v.Registerer.Register(collector); err != nil {
return nil, fmt.Errorf("counter '%s' %v registration: %v", name, labelNames, err)
}
v.collectors[name] = collector
Expand All @@ -55,7 +56,7 @@ func (v *GroupedVault) GetOrCreateGaugeCollector(name string, labelNames []strin
collector, ok := v.collectors[name]
if !ok {
collector = NewConstGaugeCollector(name, labelNames)
if err := prometheus.Register(collector); err != nil {
if err := v.Registerer.Register(collector); err != nil {
return nil, fmt.Errorf("gauge '%s' %v registration: %v", name, labelNames, err)
}
v.collectors[name] = collector
Expand Down
1 change: 1 addition & 0 deletions pkg/metric_storage/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func Test_CounterAdd(t *testing.T) {
log.SetOutput(buf)

v := NewGroupedVault()
v.Registerer = prometheus.DefaultRegisterer

v.CounterAdd("group1", "metric_total", 1.0, map[string]string{"lbl": "val"})

Expand Down
63 changes: 52 additions & 11 deletions pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/go-chi/chi"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
uuid "gopkg.in/satori/go.uuid.v1"

Expand Down Expand Up @@ -45,7 +44,9 @@ type ShellOperator struct {
TempDir string

MetricStorage *metric_storage.MetricStorage
KubeClient kube.KubernetesClient
// separate metric storage for hook metrics if separate listen port is configured
HookMetricStorage *metric_storage.MetricStorage
KubeClient kube.KubernetesClient

ScheduleManager schedule_manager.ScheduleManager
KubeEventsManager kube_events_manager.KubeEventsManager
Expand Down Expand Up @@ -90,11 +91,11 @@ func (op *ShellOperator) WithMetricStorage(metricStorage *metric_storage.MetricS
op.MetricStorage = metricStorage
}

// InitMetricStorage creates default MetricStorage object if not set earlier.
func (op *ShellOperator) InitMetricStorage() {
if op.MetricStorage != nil {
return
}
// Metric storage.
metricStorage := metric_storage.NewMetricStorage()
metricStorage.WithContext(op.ctx)
metricStorage.WithPrefix(app.PrometheusMetricsPrefix)
Expand All @@ -103,6 +104,20 @@ func (op *ShellOperator) InitMetricStorage() {
op.MetricStorage = metricStorage
}

// InitHookMetricStorage creates MetricStorage object
// with new registry to scrape hook metrics on separate port.
func (op *ShellOperator) InitHookMetricStorage() {
if op.HookMetricStorage != nil {
return
}
metricStorage := metric_storage.NewMetricStorage()
metricStorage.WithContext(op.ctx)
metricStorage.WithPrefix(app.PrometheusMetricsPrefix)
metricStorage.WithNewRegistry()
metricStorage.Start()
op.HookMetricStorage = metricStorage
}

// Init does some basic checks and instantiate dependencies
//
// - check directories
Expand Down Expand Up @@ -416,7 +431,7 @@ func (op *ShellOperator) TaskHandleHookRun(t task.Task) queue.TaskResult {
metrics, err := taskHook.Run(hookMeta.BindingType, hookMeta.BindingContext, hookLogLabels)

if err == nil {
err = op.MetricStorage.SendBatch(metrics, map[string]string{
err = op.HookMetricStorage.SendBatch(metrics, map[string]string{
"hook": hookMeta.HookName,
})
}
Expand Down Expand Up @@ -674,10 +689,10 @@ func (op *ShellOperator) SetupHttpServerHandles() {
</html>`, app.ListenPort)
})

http.Handle("/metrics", promhttp.Handler())
http.Handle("/metrics", op.MetricStorage.Handler())
}

func (op *ShellOperator) StartHttpServer(ip string, port string) error {
func (op *ShellOperator) StartHttpServer(ip string, port string, mux *http.ServeMux) error {
address := fmt.Sprintf("%s:%s", ip, port)

// Check if port is available
Expand All @@ -690,7 +705,7 @@ func (op *ShellOperator) StartHttpServer(ip string, port string) error {
log.Infof("Listen on %s", address)

go func() {
if err := http.Serve(listener, nil); err != nil {
if err := http.Serve(listener, mux); err != nil {
log.Errorf("Error starting HTTP server: %s", err)
os.Exit(1)
}
Expand All @@ -699,22 +714,48 @@ func (op *ShellOperator) StartHttpServer(ip string, port string) error {
return nil
}

func (op *ShellOperator) SetupHookMetricStorageAndServer() error {
if op.HookMetricStorage != nil {
return nil
}
if app.HookMetricsListenPort == "" || app.HookMetricsListenPort == app.ListenPort {
// register default prom handler in DefaultServeMux
op.HookMetricStorage = op.MetricStorage
} else {
// create new metric storage for hooks
op.InitHookMetricStorage()
// Create new ServeMux, serve on custom port
mux := http.NewServeMux()
err := op.StartHttpServer(app.ListenAddress, app.HookMetricsListenPort, mux)
if err != nil {
return err
}
// register scrape handler
mux.Handle("/metrics", op.HookMetricStorage.Handler())
}
return nil
}

func DefaultOperator() *ShellOperator {
operator := NewShellOperator()
operator.WithContext(context.Background())
return operator
}

func InitAndStart(operator *ShellOperator) error {
operator.SetupHttpServerHandles()

err := operator.StartHttpServer(app.ListenAddress, app.ListenPort)
err := operator.StartHttpServer(app.ListenAddress, app.ListenPort, http.DefaultServeMux)
if err != nil {
log.Errorf("HTTP SERVER start failed: %v", err)
return err
}

operator.InitMetricStorage()
operator.SetupHttpServerHandles()

err = operator.SetupHookMetricStorageAndServer()
if err != nil {
log.Errorf("HTTP SERVER for hook metrics start failed: %v", err)
return err
}

err = operator.Init()
if err != nil {
Expand Down

0 comments on commit 126d478

Please sign in to comment.