Skip to content

Commit

Permalink
Collect metrics from the gateway (#2171)
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj authored Jan 13, 2025
1 parent f383afa commit c7e8f8a
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 2 deletions.
189 changes: 189 additions & 0 deletions tools/walletextension/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package metrics

import (
"crypto/sha256"
"encoding/hex"
"log"
"sync"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/tools/walletextension/storage/database/cosmosdb"
)

const (
// Persistence intervals (how often metrics are saved to CosmosDB)
MetricsPersistInterval = 10 * time.Minute

// Cleanup intervals (how often inactive users are cleaned up)
InactiveUserCleanupInterval = 1 * time.Hour

// Activity thresholds
UserInactivityThreshold = 30 * 24 * time.Hour // 30 days
MonthlyActiveUserWindow = 30 * 24 * time.Hour // 30 days
)

// Metrics interface defines the metrics operations
type Metrics interface {
RecordNewUser()
RecordAccountRegistered()
RecordUserActivity(anonymousID string)
GetTotalUsers() uint64
GetTotalAccountsRegistered() uint64
GetMonthlyActiveUsers() int
Stop()
}

type MetricsTracker struct {
totalUsers atomic.Uint64
accountsRegistered atomic.Uint64
activeUsers map[string]time.Time // key is double-hashed userID
activeUserLock sync.RWMutex
storage *cosmosdb.MetricsStorageCosmosDB
persistTicker *time.Ticker
}

func NewMetricsTracker(storage *cosmosdb.MetricsStorageCosmosDB) Metrics {
mt := &MetricsTracker{
activeUsers: make(map[string]time.Time),
storage: storage,
persistTicker: time.NewTicker(MetricsPersistInterval),
}

// Load existing metrics
if metrics, err := storage.LoadMetrics(); err == nil {
mt.totalUsers.Store(metrics.TotalUsers)
mt.accountsRegistered.Store(metrics.AccountsRegistered)

mt.activeUserLock.Lock()
for hashedUserID, timestamp := range metrics.ActiveUsers {
if t, err := time.Parse(time.RFC3339, timestamp); err == nil {
mt.activeUsers[hashedUserID] = t
}
}
mt.activeUserLock.Unlock()
}

// Start cleanup routine for inactive users
go mt.cleanupInactiveUsers()
go mt.persistMetrics()

return mt
}

// hashUserID creates a double-hashed version of the userID
func (mt *MetricsTracker) hashUserID(userID []byte) string {
// First hash
firstHash := sha256.Sum256(userID)
// Second hash
secondHash := sha256.Sum256(firstHash[:])
return hex.EncodeToString(secondHash[:])
}

func (mt *MetricsTracker) RecordNewUser() {
mt.totalUsers.Add(1)
}

// RecordAccountRegistered increments the total number of registered accounts
func (mt *MetricsTracker) RecordAccountRegistered() {
mt.accountsRegistered.Add(1)
}

// RecordUserActivity updates the last activity timestamp for a user
func (mt *MetricsTracker) RecordUserActivity(anonymousID string) {
hashedUserID := mt.hashUserID([]byte(anonymousID))

mt.activeUserLock.Lock()
mt.activeUsers[hashedUserID] = time.Now()
mt.activeUserLock.Unlock()
}

// GetTotalUsers returns the total number of registered users
func (mt *MetricsTracker) GetTotalUsers() uint64 {
return mt.totalUsers.Load()
}

// GetTotalAccountsRegistered returns the total number of registered accounts
func (mt *MetricsTracker) GetTotalAccountsRegistered() uint64 {
return mt.accountsRegistered.Load()
}

// GetMonthlyActiveUsers returns the number of users active in the last 30 days
func (mt *MetricsTracker) GetMonthlyActiveUsers() int {
mt.activeUserLock.RLock()
defer mt.activeUserLock.RUnlock()

count := 0
activeThreshold := time.Now().Add(-MonthlyActiveUserWindow)

for _, lastActive := range mt.activeUsers {
if lastActive.After(activeThreshold) {
count++
}
}
return count
}

// persistMetrics periodically saves metrics to CosmosDB
func (mt *MetricsTracker) persistMetrics() {
for range mt.persistTicker.C {
mt.saveMetrics()
}
}

func (mt *MetricsTracker) saveMetrics() {
mt.activeUserLock.RLock()
activeUsersMap := make(map[string]string)
for hashedUserID, timestamp := range mt.activeUsers {
activeUsersMap[hashedUserID] = timestamp.UTC().Format(time.RFC3339)
}
mt.activeUserLock.RUnlock()

metrics := &cosmosdb.MetricsDocument{
ID: cosmosdb.METRICS_DOC_ID,
TotalUsers: mt.totalUsers.Load(),
AccountsRegistered: mt.accountsRegistered.Load(),
ActiveUsers: activeUsersMap,
}

if err := mt.storage.SaveMetrics(metrics); err != nil {
// Either log the error properly or return it
log.Printf("Failed to persist metrics: %v", err)
}
}

func (mt *MetricsTracker) cleanupInactiveUsers() {
ticker := time.NewTicker(InactiveUserCleanupInterval)
for range ticker.C {
mt.activeUserLock.Lock()
inactiveThreshold := time.Now().Add(-UserInactivityThreshold)

for userID, lastActive := range mt.activeUsers {
if lastActive.Before(inactiveThreshold) {
delete(mt.activeUsers, userID)
}
}
mt.activeUserLock.Unlock()
}
}

// Stop cleanly stops the metrics tracker
func (mt *MetricsTracker) Stop() {
mt.persistTicker.Stop()
mt.saveMetrics() // Final save before stopping
}

// NoOpMetricsTracker implements Metrics interface but does nothing
type NoOpMetricsTracker struct{}

func NewNoOpMetricsTracker() Metrics {
return &NoOpMetricsTracker{}
}

func (mt *NoOpMetricsTracker) RecordNewUser() {}
func (mt *NoOpMetricsTracker) RecordAccountRegistered() {}
func (mt *NoOpMetricsTracker) RecordUserActivity(string) {}
func (mt *NoOpMetricsTracker) GetTotalUsers() uint64 { return 0 }
func (mt *NoOpMetricsTracker) GetTotalAccountsRegistered() uint64 { return 0 }
func (mt *NoOpMetricsTracker) GetMonthlyActiveUsers() int { return 0 }
func (mt *NoOpMetricsTracker) Stop() {}
2 changes: 2 additions & 0 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func ExecAuthRPC[R any](ctx context.Context, w *services.Services, cfg *AuthExec
return nil, err
}

w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(user.ID))

rateLimitAllowed, requestUUID := w.RateLimiter.Allow(gethcommon.Address(user.ID))
defer w.RateLimiter.SetRequestEnd(gethcommon.Address(user.ID), requestUUID)
if !rateLimitAllowed {
Expand Down
9 changes: 8 additions & 1 deletion tools/walletextension/services/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/status-im/keycard-go/hexutils"

"github.com/ten-protocol/go-ten/tools/walletextension/cache"
"github.com/ten-protocol/go-ten/tools/walletextension/metrics"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -49,6 +50,7 @@ type Services struct {
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
cacheInvalidationCh chan *tencommon.BatchHeader
MetricsTracker metrics.Metrics
}

type NewHeadNotifier interface {
Expand All @@ -58,7 +60,7 @@ type NewHeadNotifier interface {
// number of rpc responses to cache
const rpcResponseCacheSize = 1_000_000

func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services {
func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, metricsTracker metrics.Metrics, config *common.Config) *Services {
newGatewayCache, err := cache.NewCache(rpcResponseCacheSize, logger)
if err != nil {
logger.Error(fmt.Errorf("could not create cache. Cause: %w", err).Error())
Expand All @@ -80,6 +82,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
RateLimiter: rateLimiter,
Config: config,
cacheInvalidationCh: make(chan *tencommon.BatchHeader),
MetricsTracker: metricsTracker,
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
Expand Down Expand Up @@ -185,6 +188,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) {
w.Logger().Error(fmt.Sprintf("failed to save user to the database: %s", err))
return nil, err
}
w.MetricsTracker.RecordNewUser()

requestEndTime := time.Now()
duration := requestEndTime.Sub(requestStartTime)
Expand All @@ -194,6 +198,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) {

// AddAddressToUser checks if a message is in correct format and if signature is valid. If all checks pass we save address and signature against userID
func (w *Services) AddAddressToUser(userID []byte, address string, signature []byte, signatureType viewingkey.SignatureType) error {
w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID))
audit(w, "Adding address to user: %s, address: %s", hexutils.BytesToHex(userID), address)
requestStartTime := time.Now()
addressFromMessage := gethcommon.HexToAddress(address)
Expand All @@ -213,13 +218,15 @@ func (w *Services) AddAddressToUser(userID []byte, address string, signature []b
w.Logger().Error(fmt.Errorf("error while storing account (%s) for user (%s): %w", addressFromMessage.Hex(), userID, err).Error())
return err
}
w.MetricsTracker.RecordAccountRegistered()

audit(w, "Storing new address for user: %s, address: %s, duration: %d ", hexutils.BytesToHex(userID), address, time.Since(requestStartTime).Milliseconds())
return nil
}

// UserHasAccount checks if provided account exist in the database for given userID
func (w *Services) UserHasAccount(userID []byte, address string) (bool, error) {
w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID))
audit(w, "Checking if user has account: %s, address: %s", hexutils.BytesToHex(userID), address)
addressBytes, err := hex.DecodeString(address[2:]) // remove 0x prefix from address
if err != nil {
Expand Down
133 changes: 133 additions & 0 deletions tools/walletextension/storage/database/cosmosdb/metrics_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package cosmosdb

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
)

const (
METRICS_CONTAINER_NAME = "metrics"
METRICS_DOC_ID = "global_metrics"
)

type MetricsDocument struct {
ID string `json:"id"`
TotalUsers uint64 `json:"totalUsers"`
AccountsRegistered uint64 `json:"accountsRegistered"`
ActiveUsers map[string]string `json:"activeUsers"` // double-hashed userID -> ISO timestamp
ActiveUsersCount int `json:"activeUsersCount"`
LastUpdated string `json:"lastUpdated"`
}

// MetricsStorageCosmosDB handles metrics persistence in CosmosDB
type MetricsStorageCosmosDB struct {
client *azcosmos.Client
metricsContainer *azcosmos.ContainerClient
}

func NewMetricsStorage(connectionString string) (*MetricsStorageCosmosDB, error) {
client, err := azcosmos.NewClientFromConnectionString(connectionString, nil)
if err != nil {
return nil, fmt.Errorf("failed to create CosmosDB client: %w", err)
}

ctx := context.Background()

// Ensure database exists
_, err = client.CreateDatabase(ctx, azcosmos.DatabaseProperties{ID: DATABASE_NAME}, nil)
if err != nil && !strings.Contains(err.Error(), "Conflict") {
return nil, fmt.Errorf("failed to create database: %w", err)
}

metricsContainer, err := client.NewContainer(DATABASE_NAME, METRICS_CONTAINER_NAME)
if err != nil {
return nil, fmt.Errorf("failed to get metrics container: %w", err)
}

return &MetricsStorageCosmosDB{
client: client,
metricsContainer: metricsContainer,
}, nil
}

func (m *MetricsStorageCosmosDB) LoadMetrics() (*MetricsDocument, error) {
ctx := context.Background()
partitionKey := azcosmos.NewPartitionKeyString(METRICS_DOC_ID)

response, err := m.metricsContainer.ReadItem(ctx, partitionKey, METRICS_DOC_ID, nil)
if err != nil {
if strings.Contains(err.Error(), "NotFound") {
// Initialize with empty metrics if not found
return &MetricsDocument{
ID: METRICS_DOC_ID,
ActiveUsers: make(map[string]string),
}, nil
}
return nil, err
}

var doc MetricsDocument
if err := json.Unmarshal(response.Value, &doc); err != nil {
return nil, err
}
return &doc, nil
}

func (m *MetricsStorageCosmosDB) SaveMetrics(metrics *MetricsDocument) error {
ctx := context.Background()
partitionKey := azcosmos.NewPartitionKeyString(METRICS_DOC_ID)

// Calculate active users count and clean up inactive users
activeThreshold := time.Now().Add(-30 * 24 * time.Hour) // 30 days
activeCount := 0
activeUsersMap := make(map[string]string)

for userID, timestampStr := range metrics.ActiveUsers {
if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil {
if timestamp.After(activeThreshold) {
activeCount++
activeUsersMap[userID] = timestampStr
}
}
}

metrics.ActiveUsers = activeUsersMap // Only keep active users
metrics.ActiveUsersCount = activeCount
metrics.LastUpdated = time.Now().UTC().Format(time.RFC3339)

docJSON, err := json.Marshal(metrics)
if err != nil {
return err
}

_, err = m.metricsContainer.UpsertItem(ctx, partitionKey, docJSON, nil)
return err
}

// NoOpMetricsStorage is a no-op implementation of metrics storage
type noOpMetricsStorage struct{}

// MetricsStorage interface defines the metrics storage operations
type MetricsStorage interface {
LoadMetrics() (*MetricsDocument, error)
SaveMetrics(*MetricsDocument) error
}

func NewNoOpMetricsStorage() MetricsStorage {
return &noOpMetricsStorage{}
}

func (n *noOpMetricsStorage) LoadMetrics() (*MetricsDocument, error) {
return &MetricsDocument{
ActiveUsers: make(map[string]string),
}, nil
}

func (n *noOpMetricsStorage) SaveMetrics(*MetricsDocument) error {
return nil
}
Loading

0 comments on commit c7e8f8a

Please sign in to comment.