Skip to content

Commit

Permalink
add: node population aggregation table
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Dec 16, 2024
1 parent e322336 commit 73a7ccf
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 26 deletions.
76 changes: 76 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ants

import (
"regexp"
"strconv"
"strings"
)

var (
semverRegex = regexp.MustCompile(`.*(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*).*`)
hexRegex = regexp.MustCompile(`^[a-fA-F0-9]+$`)
)

type agentVersionInfo struct {
full string
typ string
major int
minor int
patch int
hash string
}

func (avi *agentVersionInfo) Semver() [3]int {
return [3]int{avi.major, avi.minor, avi.patch}
}

func parseAgentVersion(av string) agentVersionInfo {
avi := agentVersionInfo{
full: av,
}

switch {
case av == "":
return avi
case av == "celestia-celestia":
avi.typ = "celestia-celestia"
return avi
case strings.HasPrefix(av, "celestia-node/celestia/"):
// fallthrough
default:
avi.typ = "other"
return avi
}

parts := strings.Split(av, "/")
if len(parts) > 2 {
switch parts[2] {
case "bridge", "full", "light":
avi.typ = parts[2]
default:
avi.typ = "other"
}
}

if len(parts) > 3 {
matches := semverRegex.FindStringSubmatch(parts[3])
if matches != nil {
for i, name := range semverRegex.SubexpNames() {
switch name {
case "major":
avi.major, _ = strconv.Atoi(matches[i])
case "minor":
avi.minor, _ = strconv.Atoi(matches[i])
case "patch":
avi.patch, _ = strconv.Atoi(matches[i])
}
}
}
}

if len(parts) > 4 && hexRegex.MatchString(parts[4]) {
avi.hash = parts[4]
}

return avi
}
113 changes: 113 additions & 0 deletions agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package ants

import (
"reflect"
"testing"
)

func Test_parseAgentVersion(t *testing.T) {
tests := []struct {
av string
want agentVersionInfo
}{
{
av: "",
want: agentVersionInfo{},
},
{
av: "celestia-node/celestia/bridge/v0.17.1/078c291",
want: agentVersionInfo{
full: "celestia-node/celestia/bridge/v0.17.1/078c291",
typ: "bridge",
major: 0,
minor: 17,
patch: 1,
hash: "078c291",
},
},
{
av: "celestia-node/celestia/full/v0.17.2/57f8bd8",
want: agentVersionInfo{
full: "celestia-node/celestia/full/v0.17.2/57f8bd8",
typ: "full",
major: 0,
minor: 17,
patch: 2,
hash: "57f8bd8",
},
},
{
av: "celestia-node/celestia/random/v4.46.6/57f8bd8",
want: agentVersionInfo{
full: "celestia-node/celestia/random/v4.46.6/57f8bd8",
typ: "other",
major: 4,
minor: 46,
patch: 6,
hash: "57f8bd8",
},
},
{
av: "celestia-node/celestia/light/vv0.14.0/13439cc",
want: agentVersionInfo{
full: "celestia-node/celestia/light/vv0.14.0/13439cc",
typ: "light",
major: 0,
minor: 14,
patch: 0,
hash: "13439cc",
},
},
{
av: "celestia-node/celestia/light/v0.20.3-15-gbd3105b9/bd3105b",
want: agentVersionInfo{
full: "celestia-node/celestia/light/v0.20.3-15-gbd3105b9/bd3105b",
typ: "light",
major: 0,
minor: 20,
patch: 3,
hash: "bd3105b",
},
},
{
av: "celestia-node/celestia/full/v0.18.0-refs-tags-v0-20-1-mocha.0/353141f",
want: agentVersionInfo{
full: "celestia-node/celestia/full/v0.18.0-refs-tags-v0-20-1-mocha.0/353141f",
typ: "full",
major: 0,
minor: 18,
patch: 0,
hash: "353141f",
},
},
{
av: "celestia-node/celestia/light/unknown/unknown",
want: agentVersionInfo{
full: "celestia-node/celestia/light/unknown/unknown",
typ: "light",
},
},
{
av: "celestia-celestia",
want: agentVersionInfo{
full: "celestia-celestia",
typ: "celestia-celestia",
},
},
{
av: "celestiant",
want: agentVersionInfo{
full: "celestiant",
typ: "other",
},
},
}

for _, tt := range tests {
t.Run(tt.av, func(t *testing.T) {
if got := parseAgentVersion(tt.av); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseAgentVersion() = %v, want %v", got, tt.want)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE requests
DROP COLUMN IF EXISTS agent_version_type,
DROP COLUMN IF EXISTS agent_version_semver;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE requests
ADD COLUMN agent_version_type LowCardinality(String) AFTER agent_version,
ADD COLUMN agent_version_semver Array(Int16) AFTER agent_version_type;
1 change: 1 addition & 0 deletions db/migrations/000003_create_node_population_table.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS node_population;
7 changes: 7 additions & 0 deletions db/migrations/000003_create_node_population_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE node_population (
timestamp DateTime,
remote_multihash AggregateFunction(uniqExact, String),
agent_version_type LowCardinality(String),
agent_version_semver Array(Int16)
) ENGINE = AggregatingMergeTree()
PRIMARY KEY (timestamp, agent_version_type)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS node_population_mv;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE MATERIALIZED VIEW node_population_mv TO node_population AS
SELECT
toStartOfTenMinutes(started_at) as timestamp,
uniqExactState(remote_multihash) as remote_multihash,
agent_version_type,
agent_version_semver
FROM requests
GROUP BY timestamp, agent_version_type, agent_version_semver
1 change: 1 addition & 0 deletions db/migrations/000005_alter_requests_table_ttl.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE requests MODIFY TTL toDateTime(started_at) + INTERVAL 180 DAY;
1 change: 1 addition & 0 deletions db/migrations/000005_alter_requests_table_ttl.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE requests MODIFY TTL toDateTime(started_at) + INTERVAL 90 DAY;
22 changes: 12 additions & 10 deletions db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
)

type Request struct {
UUID uuid.UUID `ch:"id"`
QueenID string `ch:"queen_id"`
AntID peer.ID `ch:"ant_multihash"`
RemoteID peer.ID `ch:"remote_multihash"`
RequestType pb.Message_MessageType `ch:"request_type"`
AgentVersion string `ch:"agent_version"`
Protocols []string `ch:"protocols"`
StartedAt time.Time `ch:"started_at"`
KeyID string `ch:"key_multihash"`
MultiAddresses []string `ch:"multi_addresses"`
UUID uuid.UUID `ch:"id"`
QueenID string `ch:"queen_id"`
AntID peer.ID `ch:"ant_multihash"`
RemoteID peer.ID `ch:"remote_multihash"`
RequestType pb.Message_MessageType `ch:"request_type"`
AgentVersion string `ch:"agent_version"`
AgentVersionType string `ch:"agent_version_type"`
AgentVersionSemVer [3]int `ch:"agent_version_semver"`
Protocols []string `ch:"protocols"`
StartedAt time.Time `ch:"started_at"`
KeyID string `ch:"key_multihash"`
MultiAddresses []string `ch:"multi_addresses"`
}
37 changes: 21 additions & 16 deletions queen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ants
import (
"context"
"fmt"
"sort"
"strconv"
"time"

Expand Down Expand Up @@ -55,7 +56,7 @@ type Queen struct {

peerstore peerstore.Peerstore
datastore ds.Batching
agentsCache *lru.Cache[string, string]
agentsCache *lru.Cache[string, agentVersionInfo]
protocolsCache *lru.Cache[string, []protocol.ID]

ants []*Ant
Expand All @@ -79,7 +80,7 @@ func NewQueen(clickhouseClient db.Client, cfg *QueenConfig) (*Queen, error) {
return nil, fmt.Errorf("creating in-memory leveldb: %w", err)
}

agentsCache, err := lru.New[string, string](cfg.CacheSize)
agentsCache, err := lru.New[string, agentVersionInfo](cfg.CacheSize)
if err != nil {
return nil, fmt.Errorf("init agents cache: %w", err)
}
Expand Down Expand Up @@ -185,16 +186,18 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) {
maddrStrs[i] = maddr.String()
}

avi := parseAgentVersion(evt.AgentVersion)

// cache agent version
if evt.AgentVersion == "" {
if avi.full == "" {
var found bool
evt.AgentVersion, found = q.agentsCache.Get(evt.Remote.String())
avi, found = q.agentsCache.Get(evt.Remote.String())
q.cfg.Telemetry.CacheHitCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("hit", strconv.FormatBool(found)),
attribute.String("cache", "agent_version"),
))
} else {
q.agentsCache.Add(evt.Remote.String(), evt.AgentVersion)
q.agentsCache.Add(evt.Remote.String(), avi)
}

// cache protocols
Expand All @@ -211,6 +214,7 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) {
q.protocolsCache.Add(evt.Remote.String(), evt.Protocols)
}
protocolStrs := protocol.ConvertToStrings(protocols)
sort.Strings(protocolStrs)

uuidv7, err := uuid.NewV7()
if err != nil {
Expand All @@ -219,22 +223,23 @@ func (q *Queen) consumeAntsEvents(ctx context.Context) {
}

request := &db.Request{
UUID: uuidv7,
QueenID: q.id,
AntID: evt.Self,
RemoteID: evt.Remote,
RequestType: evt.Type,
AgentVersion: evt.AgentVersion,
Protocols: protocolStrs,
StartedAt: evt.Timestamp,
KeyID: evt.Target.B58String(),
MultiAddresses: maddrStrs,
UUID: uuidv7,
QueenID: q.id,
AntID: evt.Self,
RemoteID: evt.Remote,
RequestType: evt.Type,
AgentVersion: evt.AgentVersion,
AgentVersionType: avi.typ,
AgentVersionSemVer: avi.Semver(),
Protocols: protocolStrs,
StartedAt: evt.Timestamp,
KeyID: evt.Target.B58String(),
MultiAddresses: maddrStrs,
}

requests = append(requests, request)

if len(requests) >= q.cfg.BatchSize {

if err = q.clickhouseClient.BulkInsertRequests(ctx, requests); err != nil {
logger.Errorf("Error inserting requests: %v", err)
}
Expand Down

0 comments on commit 73a7ccf

Please sign in to comment.