From 902c22a59a8a1fc78ef0c694b7c7cb3282d93d10 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Tue, 3 Dec 2024 15:17:17 +0100 Subject: [PATCH] add prometheus operational metrics --- ant.go | 1 + cmd/ants/main.go | 38 ++++++++++++- db/client.go | 20 ++++++- .../000001_create_requests_table.up.sql | 9 ++- db/models.go | 1 - metrics/telemetry.go | 56 +++++++++++++------ queen.go | 28 ++++++++-- 7 files changed, 121 insertions(+), 32 deletions(-) diff --git a/ant.go b/ant.go index e0b04e6..d859ed4 100644 --- a/ant.go +++ b/ant.go @@ -88,6 +88,7 @@ func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg * libp2p.Peerstore(ps), libp2p.DisableRelay(), libp2p.ListenAddrStrings(listenAddrs...), + libp2p.DisableMetrics(), } if cfg.Port == 0 { diff --git a/cmd/ants/main.go b/cmd/ants/main.go index 0afe484..665824c 100644 --- a/cmd/ants/main.go +++ b/cmd/ants/main.go @@ -9,14 +9,19 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel/trace/noop" + "github.com/probe-lab/ants-watch" "github.com/probe-lab/ants-watch/db" - "github.com/urfave/cli/v2" + "github.com/probe-lab/ants-watch/metrics" ) var logger = logging.Logger("ants-queen") var rootConfig = struct { + MetricsHost string + MetricsPort int ClickhouseAddress string ClickhouseDatabase string ClickhouseUsername string @@ -36,6 +41,8 @@ var rootConfig = struct { ProtocolPrefix string QueenID string }{ + MetricsHost: "127.0.0.1", + MetricsPort: 5999, // one below the FirstPort to not accidentally override it ClickhouseAddress: "", ClickhouseDatabase: "", ClickhouseUsername: "", @@ -68,6 +75,20 @@ func main() { Name: "queen", Usage: "Starts the queen service", Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "metrics.host", + Usage: "On which host to expose the metrics", + EnvVars: []string{"ANTS_METRICS_HOST"}, + Destination: &rootConfig.MetricsHost, + Value: rootConfig.MetricsHost, + }, + &cli.IntFlag{ + Name: "metrics.port", + Usage: "On which port to expose the metrics", + EnvVars: []string{"ANTS_METRICS_PORT"}, + Destination: &rootConfig.MetricsPort, + Value: rootConfig.MetricsPort, + }, &cli.StringFlag{ Name: "clickhouse.address", Usage: "ClickHouse address containing the host and port, 127.0.0.1:9000", @@ -216,6 +237,19 @@ func main() { func runQueenCommand(c *cli.Context) error { ctx := c.Context + meterProvider, err := metrics.NewMeterProvider() + if err != nil { + return fmt.Errorf("init meter provider: %w", err) + } + + telemetry, err := metrics.NewTelemetry(noop.NewTracerProvider(), meterProvider) + if err != nil { + return fmt.Errorf("init telemetry: %w", err) + } + + logger.Debugln("Starting metrics server", "host", rootConfig.MetricsHost, "port", rootConfig.MetricsPort) + go metrics.ListenAndServe(rootConfig.MetricsHost, rootConfig.MetricsPort) + // initializing a new clickhouse client client, err := db.NewClient( rootConfig.ClickhouseAddress, @@ -223,6 +257,7 @@ func runQueenCommand(c *cli.Context) error { rootConfig.ClickhouseUsername, rootConfig.ClickhousePassword, rootConfig.ClickhouseSSL, + telemetry, ) if err != nil { return fmt.Errorf("init database client: %w", err) @@ -247,6 +282,7 @@ func runQueenCommand(c *cli.Context) error { NebulaDBConnString: rootConfig.NebulaDBConnString, BucketSize: rootConfig.BucketSize, UserAgent: rootConfig.UserAgent, + Telemetry: telemetry, } // initializing queen diff --git a/db/client.go b/db/client.go index edc3eac..0d78743 100644 --- a/db/client.go +++ b/db/client.go @@ -5,6 +5,12 @@ import ( "crypto/tls" "fmt" "net" + "strconv" + "time" + + "github.com/probe-lab/ants-watch/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -21,11 +27,12 @@ type Client interface { type ClickhouseClient struct { driver.Conn + telemetry *metrics.Telemetry } var _ Client = (*ClickhouseClient)(nil) -func NewClient(address, database, username, password string, ssl bool) (*ClickhouseClient, error) { +func NewClient(address, database, username, password string, ssl bool, telemetry *metrics.Telemetry) (*ClickhouseClient, error) { logger.Infoln("Creating new clickhouse client...") conn, err := clickhouse.Open(&clickhouse.Options{ @@ -51,13 +58,20 @@ func NewClient(address, database, username, password string, ssl bool) (*Clickho } client := &ClickhouseClient{ - Conn: conn, + Conn: conn, + telemetry: telemetry, } return client, nil } -func (c *ClickhouseClient) BulkInsertRequests(ctx context.Context, requests []*Request) error { +func (c *ClickhouseClient) BulkInsertRequests(ctx context.Context, requests []*Request) (err error) { + start := time.Now() + defer func() { + c.telemetry.BulkInsertCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("success", strconv.FormatBool(err == nil)))) + c.telemetry.BulkInsertSizeHist.Record(ctx, int64(len(requests))) + c.telemetry.BulkInsertLatencyMsHist.Record(ctx, time.Since(start).Milliseconds()) + }() batch, err := c.Conn.PrepareBatch(ctx, "INSERT INTO requests", driver.WithReleaseConnection()) if err != nil { return fmt.Errorf("prepare batch: %w", err) diff --git a/db/migrations/000001_create_requests_table.up.sql b/db/migrations/000001_create_requests_table.up.sql index 8743a4a..f12da9d 100644 --- a/db/migrations/000001_create_requests_table.up.sql +++ b/db/migrations/000001_create_requests_table.up.sql @@ -6,11 +6,10 @@ CREATE TABLE requests remote_multihash String, agent_version String, protocols Array(String), - started_at DateTime, + started_at DateTime64(3), request_type String, key_multihash String, - multi_addresses Array(String), - is_self_lookup bool -) ENGINE = ReplicatedMergeTree + multi_addresses Array(String) +) ENGINE = MergeTree PRIMARY KEY (started_at) -TTL started_at + INTERVAL 1 DAY; +TTL toDateTime(started_at) + INTERVAL 1 DAY; diff --git a/db/models.go b/db/models.go index c18042a..513b2fc 100644 --- a/db/models.go +++ b/db/models.go @@ -19,5 +19,4 @@ type Request struct { StartedAt time.Time KeyID string MultiAddresses []string - IsSelfLookup bool } diff --git a/metrics/telemetry.go b/metrics/telemetry.go index 2e1ad38..9353b6d 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -8,10 +8,9 @@ import ( "fmt" "net/http" _ "net/http/pprof" - "runtime" + "github.com/ipfs/go-log/v2" "github.com/prometheus/client_golang/prometheus/promhttp" - log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" @@ -24,34 +23,57 @@ import ( "go.uber.org/atomic" ) +var logger = log.Logger("telemetry") + const ( MeterName = "github.com/probe-lab/ants-watch" TracerName = "github.com/probe-lab/ants-watch" ) type Telemetry struct { - Tracer trace.Tracer - CacheQueriesCount metric.Int64Counter - InsertRequestHistogram metric.Int64Histogram + Tracer trace.Tracer + AntsCountGauge metric.Int64Gauge + BulkInsertCounter metric.Int64Counter + BulkInsertSizeHist metric.Int64Histogram + BulkInsertLatencyMsHist metric.Int64Histogram + CacheHitCounter metric.Int64Counter } func NewTelemetry(tp trace.TracerProvider, mp metric.MeterProvider) (*Telemetry, error) { meter := mp.Meter(MeterName) - cacheQueriesCount, err := meter.Int64Counter("cache_queries", metric.WithDescription("Number of queries to the LRU caches")) + antsCountGauge, err := meter.Int64Gauge("ants_count", metric.WithDescription("Number of running ants")) + if err != nil { + return nil, fmt.Errorf("ants_count gauge: %w", err) + } + + bulkInsertCounter, err := meter.Int64Counter("bulk_insert_count", metric.WithDescription("Number of bulk inserts")) if err != nil { - return nil, fmt.Errorf("cache_queries counter: %w", err) + return nil, fmt.Errorf("bulk_insert_count gauge: %w", err) } - insertRequestHistogram, err := meter.Int64Histogram("insert_request_timing", metric.WithDescription("Histogram of database query times for request insertions"), metric.WithUnit("milliseconds")) + bulkInsertSizeHist, err := meter.Int64Histogram("bulk_insert_size", metric.WithDescription("Size of bulk inserts"), metric.WithExplicitBucketBoundaries(0, 10, 50, 100, 500, 1000)) if err != nil { - return nil, fmt.Errorf("cache_queries counter: %w", err) + return nil, fmt.Errorf("bulk_insert_size histogram: %w", err) + } + + bulkInsertLatencyMsHist, err := meter.Int64Histogram("bulk_insert_latency", metric.WithDescription("Latency of bulk inserts (ms)"), metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("bulk_insert_latency histogram: %w", err) + } + + cacheHitCounter, err := meter.Int64Counter("cache_hit_count", metric.WithDescription("Number of cache hits")) + if err != nil { + return nil, fmt.Errorf("cache_hit_counter gauge: %w", err) } return &Telemetry{ - Tracer: tp.Tracer(TracerName), - CacheQueriesCount: cacheQueriesCount, - InsertRequestHistogram: insertRequestHistogram, + Tracer: tp.Tracer(TracerName), + BulkInsertCounter: bulkInsertCounter, + BulkInsertSizeHist: bulkInsertSizeHist, + BulkInsertLatencyMsHist: bulkInsertLatencyMsHist, + CacheHitCounter: cacheHitCounter, + AntsCountGauge: antsCountGauge, }, nil } @@ -103,14 +125,11 @@ func NewTracerProvider(ctx context.Context, host string, port int) (trace.Tracer // `ants health`. func ListenAndServe(host string, port int) { addr := fmt.Sprintf("%s:%d", host, port) - log.WithField("addr", addr).Debugln("Starting telemetry endpoint") - - // profile 1% of contention events - runtime.SetMutexProfileFraction(1) + logger.Debugln("Starting telemetry endpoint", "addr", addr) http.Handle("/metrics", promhttp.Handler()) http.HandleFunc("/health", func(rw http.ResponseWriter, req *http.Request) { - log.Debugln("Responding to health check") + logger.Debugln("Responding to health check") if HealthStatus.Load() { rw.WriteHeader(http.StatusOK) } else { @@ -118,7 +137,8 @@ func ListenAndServe(host string, port int) { } }) + logger.Info("Starting prometheus server", "addr", addr) if err := http.ListenAndServe(addr, nil); err != nil { - log.WithError(err).Warnln("Error serving prometheus") + logger.Warnln("Error serving prometheus", "err", err) } } diff --git a/queen.go b/queen.go index d789b78..e8f5566 100644 --- a/queen.go +++ b/queen.go @@ -3,6 +3,7 @@ package ants import ( "context" "fmt" + "strconv" "time" "github.com/google/uuid" @@ -11,13 +12,16 @@ import ( leveldb "github.com/ipfs/go-ds-leveldb" "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-kad-dht/ants" - pb "github.com/libp2p/go-libp2p-kad-dht/pb" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "github.com/probe-lab/ants-watch/db" + "github.com/probe-lab/ants-watch/metrics" "github.com/probe-lab/go-libdht/kad" "github.com/probe-lab/go-libdht/kad/key" "github.com/probe-lab/go-libdht/kad/key/bit256" @@ -39,6 +43,7 @@ type QueenConfig struct { NebulaDBConnString string BucketSize int UserAgent string + Telemetry *metrics.Telemetry } type Queen struct { @@ -182,7 +187,15 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { // cache agent version if evt.AgentVersion == "" { - evt.AgentVersion, _ = q.agentsCache.Get(evt.Remote.String()) + var found bool + evt.AgentVersion, found = q.agentsCache.Get(evt.Remote.String()) + q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes( + attribute.String("hit", strconv.FormatBool(found)), + attribute.String("cache", "agent_version"), + )) + if found { + continue + } } else { q.agentsCache.Add(evt.Remote.String(), evt.AgentVersion) } @@ -190,7 +203,12 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { // cache protocols var protocols []protocol.ID if len(evt.Protocols) == 0 { - protocols, _ = q.protocolsCache.Get(evt.Remote.String()) + var found bool + protocols, found = q.protocolsCache.Get(evt.Remote.String()) + q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes( + attribute.String("hit", strconv.FormatBool(found)), + attribute.String("cache", "protocols"), + )) } else { protocols = evt.Protocols q.protocolsCache.Add(evt.Remote.String(), evt.Protocols) @@ -214,12 +232,12 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) { StartedAt: evt.Timestamp, KeyID: evt.Target.B58String(), MultiAddresses: maddrStrs, - IsSelfLookup: peer.ID(evt.Target) == evt.Remote && evt.Type == pb.Message_FIND_NODE, } requests = append(requests, request) if len(requests) >= q.cfg.BatchSize { + if err = q.clickhouseClient.BulkInsertRequests(ctx, requests); err != nil { logger.Errorf("Error inserting requests: %v", err) } @@ -335,6 +353,8 @@ func (q *Queen) routine(ctx context.Context) { q.ants = append(q.ants, ant) } + q.cfg.Telemetry.AntsCountGauge.Record(ctx, int64(len(q.ants))) + logger.Debugf("ants count: %d", len(q.ants)) logger.Debug("queen routine over") }