Skip to content

Commit

Permalink
add prometheus operational metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Dec 3, 2024
1 parent 54d1c52 commit 902c22a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 32 deletions.
1 change: 1 addition & 0 deletions ant.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg *
libp2p.Peerstore(ps),
libp2p.DisableRelay(),
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.DisableMetrics(),
}

if cfg.Port == 0 {
Expand Down
38 changes: 37 additions & 1 deletion cmd/ants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/trace/noop"

"github.com/probe-lab/ants-watch"
"github.com/probe-lab/ants-watch/db"
"github.com/urfave/cli/v2"
"github.com/probe-lab/ants-watch/metrics"
)

var logger = logging.Logger("ants-queen")

var rootConfig = struct {
MetricsHost string
MetricsPort int
ClickhouseAddress string
ClickhouseDatabase string
ClickhouseUsername string
Expand All @@ -36,6 +41,8 @@ var rootConfig = struct {
ProtocolPrefix string
QueenID string
}{
MetricsHost: "127.0.0.1",
MetricsPort: 5999, // one below the FirstPort to not accidentally override it
ClickhouseAddress: "",
ClickhouseDatabase: "",
ClickhouseUsername: "",
Expand Down Expand Up @@ -68,6 +75,20 @@ func main() {
Name: "queen",
Usage: "Starts the queen service",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "metrics.host",
Usage: "On which host to expose the metrics",
EnvVars: []string{"ANTS_METRICS_HOST"},
Destination: &rootConfig.MetricsHost,
Value: rootConfig.MetricsHost,
},
&cli.IntFlag{
Name: "metrics.port",
Usage: "On which port to expose the metrics",
EnvVars: []string{"ANTS_METRICS_PORT"},
Destination: &rootConfig.MetricsPort,
Value: rootConfig.MetricsPort,
},
&cli.StringFlag{
Name: "clickhouse.address",
Usage: "ClickHouse address containing the host and port, 127.0.0.1:9000",
Expand Down Expand Up @@ -216,13 +237,27 @@ func main() {
func runQueenCommand(c *cli.Context) error {
ctx := c.Context

meterProvider, err := metrics.NewMeterProvider()
if err != nil {
return fmt.Errorf("init meter provider: %w", err)
}

telemetry, err := metrics.NewTelemetry(noop.NewTracerProvider(), meterProvider)
if err != nil {
return fmt.Errorf("init telemetry: %w", err)
}

logger.Debugln("Starting metrics server", "host", rootConfig.MetricsHost, "port", rootConfig.MetricsPort)
go metrics.ListenAndServe(rootConfig.MetricsHost, rootConfig.MetricsPort)

// initializing a new clickhouse client
client, err := db.NewClient(
rootConfig.ClickhouseAddress,
rootConfig.ClickhouseDatabase,
rootConfig.ClickhouseUsername,
rootConfig.ClickhousePassword,
rootConfig.ClickhouseSSL,
telemetry,
)
if err != nil {
return fmt.Errorf("init database client: %w", err)
Expand All @@ -247,6 +282,7 @@ func runQueenCommand(c *cli.Context) error {
NebulaDBConnString: rootConfig.NebulaDBConnString,
BucketSize: rootConfig.BucketSize,
UserAgent: rootConfig.UserAgent,
Telemetry: telemetry,
}

// initializing queen
Expand Down
20 changes: 17 additions & 3 deletions db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"crypto/tls"
"fmt"
"net"
"strconv"
"time"

"github.com/probe-lab/ants-watch/metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -21,11 +27,12 @@ type Client interface {

type ClickhouseClient struct {
driver.Conn
telemetry *metrics.Telemetry
}

var _ Client = (*ClickhouseClient)(nil)

func NewClient(address, database, username, password string, ssl bool) (*ClickhouseClient, error) {
func NewClient(address, database, username, password string, ssl bool, telemetry *metrics.Telemetry) (*ClickhouseClient, error) {
logger.Infoln("Creating new clickhouse client...")

conn, err := clickhouse.Open(&clickhouse.Options{
Expand All @@ -51,13 +58,20 @@ func NewClient(address, database, username, password string, ssl bool) (*Clickho
}

client := &ClickhouseClient{
Conn: conn,
Conn: conn,
telemetry: telemetry,
}

return client, nil
}

func (c *ClickhouseClient) BulkInsertRequests(ctx context.Context, requests []*Request) error {
func (c *ClickhouseClient) BulkInsertRequests(ctx context.Context, requests []*Request) (err error) {
start := time.Now()
defer func() {
c.telemetry.BulkInsertCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("success", strconv.FormatBool(err == nil))))
c.telemetry.BulkInsertSizeHist.Record(ctx, int64(len(requests)))
c.telemetry.BulkInsertLatencyMsHist.Record(ctx, time.Since(start).Milliseconds())
}()
batch, err := c.Conn.PrepareBatch(ctx, "INSERT INTO requests", driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("prepare batch: %w", err)
Expand Down
9 changes: 4 additions & 5 deletions db/migrations/000001_create_requests_table.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ CREATE TABLE requests
remote_multihash String,
agent_version String,
protocols Array(String),
started_at DateTime,
started_at DateTime64(3),
request_type String,
key_multihash String,
multi_addresses Array(String),
is_self_lookup bool
) ENGINE = ReplicatedMergeTree
multi_addresses Array(String)
) ENGINE = MergeTree
PRIMARY KEY (started_at)
TTL started_at + INTERVAL 1 DAY;
TTL toDateTime(started_at) + INTERVAL 1 DAY;
1 change: 0 additions & 1 deletion db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ type Request struct {
StartedAt time.Time
KeyID string
MultiAddresses []string
IsSelfLookup bool
}
56 changes: 38 additions & 18 deletions metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"fmt"
"net/http"
_ "net/http/pprof"
"runtime"

"github.com/ipfs/go-log/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
Expand All @@ -24,34 +23,57 @@ import (
"go.uber.org/atomic"
)

var logger = log.Logger("telemetry")

const (
MeterName = "github.com/probe-lab/ants-watch"
TracerName = "github.com/probe-lab/ants-watch"
)

type Telemetry struct {
Tracer trace.Tracer
CacheQueriesCount metric.Int64Counter
InsertRequestHistogram metric.Int64Histogram
Tracer trace.Tracer
AntsCountGauge metric.Int64Gauge
BulkInsertCounter metric.Int64Counter
BulkInsertSizeHist metric.Int64Histogram
BulkInsertLatencyMsHist metric.Int64Histogram
CacheHitCounter metric.Int64Counter
}

func NewTelemetry(tp trace.TracerProvider, mp metric.MeterProvider) (*Telemetry, error) {
meter := mp.Meter(MeterName)

cacheQueriesCount, err := meter.Int64Counter("cache_queries", metric.WithDescription("Number of queries to the LRU caches"))
antsCountGauge, err := meter.Int64Gauge("ants_count", metric.WithDescription("Number of running ants"))
if err != nil {
return nil, fmt.Errorf("ants_count gauge: %w", err)
}

bulkInsertCounter, err := meter.Int64Counter("bulk_insert_count", metric.WithDescription("Number of bulk inserts"))
if err != nil {
return nil, fmt.Errorf("cache_queries counter: %w", err)
return nil, fmt.Errorf("bulk_insert_count gauge: %w", err)
}

insertRequestHistogram, err := meter.Int64Histogram("insert_request_timing", metric.WithDescription("Histogram of database query times for request insertions"), metric.WithUnit("milliseconds"))
bulkInsertSizeHist, err := meter.Int64Histogram("bulk_insert_size", metric.WithDescription("Size of bulk inserts"), metric.WithExplicitBucketBoundaries(0, 10, 50, 100, 500, 1000))
if err != nil {
return nil, fmt.Errorf("cache_queries counter: %w", err)
return nil, fmt.Errorf("bulk_insert_size histogram: %w", err)
}

bulkInsertLatencyMsHist, err := meter.Int64Histogram("bulk_insert_latency", metric.WithDescription("Latency of bulk inserts (ms)"), metric.WithUnit("ms"))
if err != nil {
return nil, fmt.Errorf("bulk_insert_latency histogram: %w", err)
}

cacheHitCounter, err := meter.Int64Counter("cache_hit_count", metric.WithDescription("Number of cache hits"))
if err != nil {
return nil, fmt.Errorf("cache_hit_counter gauge: %w", err)
}

return &Telemetry{
Tracer: tp.Tracer(TracerName),
CacheQueriesCount: cacheQueriesCount,
InsertRequestHistogram: insertRequestHistogram,
Tracer: tp.Tracer(TracerName),
BulkInsertCounter: bulkInsertCounter,
BulkInsertSizeHist: bulkInsertSizeHist,
BulkInsertLatencyMsHist: bulkInsertLatencyMsHist,
CacheHitCounter: cacheHitCounter,
AntsCountGauge: antsCountGauge,
}, nil
}

Expand Down Expand Up @@ -103,22 +125,20 @@ func NewTracerProvider(ctx context.Context, host string, port int) (trace.Tracer
// `ants health`.
func ListenAndServe(host string, port int) {
addr := fmt.Sprintf("%s:%d", host, port)
log.WithField("addr", addr).Debugln("Starting telemetry endpoint")

// profile 1% of contention events
runtime.SetMutexProfileFraction(1)
logger.Debugln("Starting telemetry endpoint", "addr", addr)

http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/health", func(rw http.ResponseWriter, req *http.Request) {
log.Debugln("Responding to health check")
logger.Debugln("Responding to health check")
if HealthStatus.Load() {
rw.WriteHeader(http.StatusOK)
} else {
rw.WriteHeader(http.StatusServiceUnavailable)
}
})

logger.Info("Starting prometheus server", "addr", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.WithError(err).Warnln("Error serving prometheus")
logger.Warnln("Error serving prometheus", "err", err)
}
}
28 changes: 24 additions & 4 deletions queen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ants
import (
"context"
"fmt"
"strconv"
"time"

"github.com/google/uuid"
Expand All @@ -11,13 +12,16 @@ import (
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-kad-dht/ants"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/probe-lab/ants-watch/db"
"github.com/probe-lab/ants-watch/metrics"
"github.com/probe-lab/go-libdht/kad"
"github.com/probe-lab/go-libdht/kad/key"
"github.com/probe-lab/go-libdht/kad/key/bit256"
Expand All @@ -39,6 +43,7 @@ type QueenConfig struct {
NebulaDBConnString string
BucketSize int
UserAgent string
Telemetry *metrics.Telemetry
}

type Queen struct {
Expand Down Expand Up @@ -182,15 +187,28 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) {

// cache agent version
if evt.AgentVersion == "" {
evt.AgentVersion, _ = q.agentsCache.Get(evt.Remote.String())
var found bool
evt.AgentVersion, found = q.agentsCache.Get(evt.Remote.String())
q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("hit", strconv.FormatBool(found)),
attribute.String("cache", "agent_version"),
))
if found {
continue
}
} else {
q.agentsCache.Add(evt.Remote.String(), evt.AgentVersion)
}

// cache protocols
var protocols []protocol.ID
if len(evt.Protocols) == 0 {
protocols, _ = q.protocolsCache.Get(evt.Remote.String())
var found bool
protocols, found = q.protocolsCache.Get(evt.Remote.String())
q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("hit", strconv.FormatBool(found)),
attribute.String("cache", "protocols"),
))
} else {
protocols = evt.Protocols
q.protocolsCache.Add(evt.Remote.String(), evt.Protocols)
Expand All @@ -214,12 +232,12 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) {
StartedAt: evt.Timestamp,
KeyID: evt.Target.B58String(),
MultiAddresses: maddrStrs,
IsSelfLookup: peer.ID(evt.Target) == evt.Remote && evt.Type == pb.Message_FIND_NODE,
}

requests = append(requests, request)

if len(requests) >= q.cfg.BatchSize {

if err = q.clickhouseClient.BulkInsertRequests(ctx, requests); err != nil {
logger.Errorf("Error inserting requests: %v", err)
}
Expand Down Expand Up @@ -335,6 +353,8 @@ func (q *Queen) routine(ctx context.Context) {
q.ants = append(q.ants, ant)
}

q.cfg.Telemetry.AntsCountGauge.Record(ctx, int64(len(q.ants)))

logger.Debugf("ants count: %d", len(q.ants))
logger.Debug("queen routine over")
}
Expand Down

0 comments on commit 902c22a

Please sign in to comment.