From 070911e6a711d25c74c221d2cadacdac97c33f63 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 12 Dec 2024 15:23:45 -0500 Subject: [PATCH 1/3] Add cloudwatch service with support for batching metric data --- aws/cwatch/service.go | 64 ++++++++++++++++++++++++++++++++++++++ aws/cwatch/service_test.go | 22 +++++++++++++ aws/dynamo/service.go | 2 +- go.mod | 7 +++-- go.sum | 14 +++++---- 5 files changed, 99 insertions(+), 10 deletions(-) create mode 100644 aws/cwatch/service.go create mode 100644 aws/cwatch/service_test.go diff --git a/aws/cwatch/service.go b/aws/cwatch/service.go new file mode 100644 index 0000000..9905aa6 --- /dev/null +++ b/aws/cwatch/service.go @@ -0,0 +1,64 @@ +package cwatch + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/nyaruka/gocommon/syncx" +) + +type Service struct { + Client *cloudwatch.Client + namespace string + batcher *syncx.Batcher[types.MetricDatum] +} + +func NewService(accessKey, secretKey, region, namespace string, wg *sync.WaitGroup) (*Service, error) { + opts := []func(*config.LoadOptions) error{config.WithRegion(region)} + + if accessKey != "" && secretKey != "" { + opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{ + AccessKeyID: accessKey, SecretAccessKey: secretKey, + }})) + } + + cfg, err := config.LoadDefaultConfig(context.TODO(), opts...) + if err != nil { + return nil, err + } + + client := cloudwatch.NewFromConfig(cfg) + s := &Service{Client: client, namespace: namespace} + s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg) + + return s, nil +} + +func (s *Service) Start() { + s.batcher.Start() +} + +func (s *Service) Stop() { + s.batcher.Stop() +} + +func (s *Service) Queue(d types.MetricDatum) { + s.batcher.Queue(d) +} + +func (s *Service) processBatch(batch []types.MetricDatum) { + _, err := s.Client.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{ + Namespace: aws.String(s.namespace), + MetricData: batch, + }) + if err != nil { + slog.Error("error sending metrics to cloudwatch", "error", err, "count", len(batch)) + } +} diff --git a/aws/cwatch/service_test.go b/aws/cwatch/service_test.go new file mode 100644 index 0000000..6f65ecc --- /dev/null +++ b/aws/cwatch/service_test.go @@ -0,0 +1,22 @@ +package cwatch_test + +import ( + "sync" + "testing" + + "github.com/nyaruka/gocommon/aws/cwatch" + "github.com/stretchr/testify/assert" +) + +func TestService(t *testing.T) { + wg := &sync.WaitGroup{} + + svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", wg) + assert.NoError(t, err) + + svc.Start() + + svc.Stop() + + wg.Wait() +} diff --git a/aws/dynamo/service.go b/aws/dynamo/service.go index c0e14b3..f386b5d 100644 --- a/aws/dynamo/service.go +++ b/aws/dynamo/service.go @@ -17,7 +17,7 @@ type Service struct { tablePrefix string } -// NewService creates a new S3 service with the given credentials and configuration +// NewService creates a new dynamodb service with the given credentials and configuration func NewService(accessKey, secretKey, region, endpoint, tablePrefix string) (*Service, error) { opts := []func(*config.LoadOptions) error{config.WithRegion(region)} diff --git a/go.mod b/go.mod index a04747d..4bba986 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/nyaruka/gocommon go 1.23 require ( - github.com/aws/aws-sdk-go-v2 v1.32.5 + github.com/aws/aws-sdk-go-v2 v1.32.6 github.com/aws/aws-sdk-go-v2/config v1.28.5 github.com/aws/aws-sdk-go-v2/credentials v1.17.46 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17 @@ -30,10 +30,11 @@ require ( require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 // indirect + github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.5 // indirect diff --git a/go.sum b/go.sum index dd2cb85..4cf3b00 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/aws/aws-sdk-go-v2 v1.32.5 h1:U8vdWJuY7ruAkzaOdD7guwJjD06YSKmnKCJs7s3IkIo= -github.com/aws/aws-sdk-go-v2 v1.32.5/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4= +github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= github.com/aws/aws-sdk-go-v2/config v1.28.5 h1:Za41twdCXbuyyWv9LndXxZZv3QhTG1DinqlFsSuvtI0= @@ -12,14 +12,16 @@ github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17 h1:36xxDfD github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17/go.mod h1:A4XQVRy4yJ70Sk5Qz2tuCQX6J5kXcRa53nGP6wtgntM= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 h1:sDSXIrlsFSFJtWKLQS4PUWRvrT580rrnuLydJrCQ/yA= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20/go.mod h1:WZ/c+w0ofps+/OUqMwWgnfrgzZH1DZO1RIkktICsqnY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 h1:4usbeaes3yJnCFC7kfeyhkdkPtoRYPa/hTmCqMpKpLI= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24/go.mod h1:5CI1JemjVwde8m2WG3cz23qHKPOxbpkq0HaoreEgLIY= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 h1:N1zsICrQglfzaBnrfM0Ys00860C+QFwu6u/5+LomP+o= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24/go.mod h1:dCn9HbJ8+K31i8IQ8EWmWj0EiIk0+vKiHNMxTTYveAg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 h1:JX70yGKLj25+lMC5Yyh8wBtvB01GDilyRuJvXJ4piD0= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24/go.mod h1:+Ln60j9SUTD0LEwnhEB0Xhg61DHqplBrbZpLgyjoEHg= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3 h1:nQLG9irjDGUFXVPDHzjCGEEwh0hZ6BcxTvHOod1YsP4= +github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3/go.mod h1:URs8sqsyaxiAZkKP6tOEmhcs9j2ynFIomqOKY/CAHJc= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1 h1:vucMirlM6D+RDU8ncKaSZ/5dGrXNajozVwpmWNPn2gQ= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1/go.mod h1:fceORfs010mNxZbQhfqUjUeHlTwANmIT4mvHamuUaUg= github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 h1:hIl7Z1zcfdzsl5SiV32acFj4gY/cZ5Xr9wd6PpoNYGE= From 1b73386ed8eae6ab40ddf2bcc8cf6dc10919ae41 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 12 Dec 2024 15:42:43 -0500 Subject: [PATCH 2/3] Make AWS service creation a little drier --- aws/config.go | 22 ++++++++++++++++++++++ aws/cwatch/service.go | 14 +++----------- aws/dynamo/service.go | 13 ++----------- aws/s3x/service.go | 13 ++----------- 4 files changed, 29 insertions(+), 33 deletions(-) create mode 100644 aws/config.go diff --git a/aws/config.go b/aws/config.go new file mode 100644 index 0000000..d646c05 --- /dev/null +++ b/aws/config.go @@ -0,0 +1,22 @@ +package aws + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" +) + +// NewConfig creates a new AWS config with the given credentials and region +func NewConfig(accessKey, secretKey, region string) (aws.Config, error) { + opts := []func(*config.LoadOptions) error{config.WithRegion(region)} + + if accessKey != "" && secretKey != "" { + opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{ + AccessKeyID: accessKey, SecretAccessKey: secretKey, + }})) + } + + return config.LoadDefaultConfig(context.TODO(), opts...) +} diff --git a/aws/cwatch/service.go b/aws/cwatch/service.go index 9905aa6..4ae233f 100644 --- a/aws/cwatch/service.go +++ b/aws/cwatch/service.go @@ -7,10 +7,9 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + awsx "github.com/nyaruka/gocommon/aws" "github.com/nyaruka/gocommon/syncx" ) @@ -20,16 +19,9 @@ type Service struct { batcher *syncx.Batcher[types.MetricDatum] } +// NewService creates a new Cloudwatch service with the given credentials and configuration func NewService(accessKey, secretKey, region, namespace string, wg *sync.WaitGroup) (*Service, error) { - opts := []func(*config.LoadOptions) error{config.WithRegion(region)} - - if accessKey != "" && secretKey != "" { - opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{ - AccessKeyID: accessKey, SecretAccessKey: secretKey, - }})) - } - - cfg, err := config.LoadDefaultConfig(context.TODO(), opts...) + cfg, err := awsx.NewConfig(accessKey, secretKey, region) if err != nil { return nil, err } diff --git a/aws/dynamo/service.go b/aws/dynamo/service.go index f386b5d..d059606 100644 --- a/aws/dynamo/service.go +++ b/aws/dynamo/service.go @@ -5,10 +5,9 @@ import ( "fmt" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + awsx "github.com/nyaruka/gocommon/aws" ) // Service is simple abstraction layer to work with a DynamoDB-compatible database @@ -19,15 +18,7 @@ type Service struct { // NewService creates a new dynamodb service with the given credentials and configuration func NewService(accessKey, secretKey, region, endpoint, tablePrefix string) (*Service, error) { - opts := []func(*config.LoadOptions) error{config.WithRegion(region)} - - if accessKey != "" && secretKey != "" { - opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{ - AccessKeyID: accessKey, SecretAccessKey: secretKey, - }})) - } - - cfg, err := config.LoadDefaultConfig(context.TODO(), opts...) + cfg, err := awsx.NewConfig(accessKey, secretKey, region) if err != nil { return nil, err } diff --git a/aws/s3x/service.go b/aws/s3x/service.go index 2360343..38abdcd 100644 --- a/aws/s3x/service.go +++ b/aws/s3x/service.go @@ -9,10 +9,9 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + awsx "github.com/nyaruka/gocommon/aws" ) // Service is simple abstraction layer to work with a S3-compatible storage service @@ -23,15 +22,7 @@ type Service struct { // NewService creates a new S3 service with the given credentials and configuration func NewService(accessKey, secretKey, region, endpoint string, minio bool) (*Service, error) { - opts := []func(*config.LoadOptions) error{config.WithRegion(region)} - - if accessKey != "" && secretKey != "" { - opts = append(opts, config.WithCredentialsProvider(credentials.StaticCredentialsProvider{Value: aws.Credentials{ - AccessKeyID: accessKey, SecretAccessKey: secretKey, - }})) - } - - cfg, err := config.LoadDefaultConfig(context.TODO(), opts...) + cfg, err := awsx.NewConfig(accessKey, secretKey, region) if err != nil { return nil, err } From 9b3775c79936131e9747ce5c6441323dedea1253 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 12 Dec 2024 16:19:09 -0500 Subject: [PATCH 3/3] Add method to add namespace and deployment to metric data --- aws/cwatch/service.go | 33 +++++++++++++++++++++++---------- aws/cwatch/service_test.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/aws/cwatch/service.go b/aws/cwatch/service.go index 4ae233f..e38f8f4 100644 --- a/aws/cwatch/service.go +++ b/aws/cwatch/service.go @@ -14,20 +14,24 @@ import ( ) type Service struct { - Client *cloudwatch.Client - namespace string - batcher *syncx.Batcher[types.MetricDatum] + Client *cloudwatch.Client + namespace string + deployment types.Dimension + batcher *syncx.Batcher[types.MetricDatum] } // NewService creates a new Cloudwatch service with the given credentials and configuration -func NewService(accessKey, secretKey, region, namespace string, wg *sync.WaitGroup) (*Service, error) { +func NewService(accessKey, secretKey, region, namespace, deployment string, wg *sync.WaitGroup) (*Service, error) { cfg, err := awsx.NewConfig(accessKey, secretKey, region) if err != nil { return nil, err } - client := cloudwatch.NewFromConfig(cfg) - s := &Service{Client: client, namespace: namespace} + s := &Service{ + Client: cloudwatch.NewFromConfig(cfg), + namespace: namespace, + deployment: types.Dimension{Name: aws.String("Deployment"), Value: aws.String(deployment)}, + } s.batcher = syncx.NewBatcher(s.processBatch, 100, time.Second*3, 1000, wg) return s, nil @@ -45,11 +49,20 @@ func (s *Service) Queue(d types.MetricDatum) { s.batcher.Queue(d) } -func (s *Service) processBatch(batch []types.MetricDatum) { - _, err := s.Client.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{ +func (s *Service) Prepare(data []types.MetricDatum) *cloudwatch.PutMetricDataInput { + // add deployment as the first dimension to all metrics + for i := range data { + data[i].Dimensions = append([]types.Dimension{s.deployment}, data[i].Dimensions...) + } + + return &cloudwatch.PutMetricDataInput{ Namespace: aws.String(s.namespace), - MetricData: batch, - }) + MetricData: data, + } +} + +func (s *Service) processBatch(batch []types.MetricDatum) { + _, err := s.Client.PutMetricData(context.TODO(), s.Prepare(batch)) if err != nil { slog.Error("error sending metrics to cloudwatch", "error", err, "count", len(batch)) } diff --git a/aws/cwatch/service_test.go b/aws/cwatch/service_test.go index 6f65ecc..4b20b3e 100644 --- a/aws/cwatch/service_test.go +++ b/aws/cwatch/service_test.go @@ -4,6 +4,9 @@ import ( "sync" "testing" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/nyaruka/gocommon/aws/cwatch" "github.com/stretchr/testify/assert" ) @@ -11,9 +14,33 @@ import ( func TestService(t *testing.T) { wg := &sync.WaitGroup{} - svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", wg) + svc, err := cwatch.NewService("root", "key", "us-east-1", "Foo", "testing", wg) assert.NoError(t, err) + assert.Equal(t, &cloudwatch.PutMetricDataInput{ + Namespace: aws.String("Foo"), + MetricData: []types.MetricDatum{ + { + MetricName: aws.String("NumGoats"), + Dimensions: []types.Dimension{ + {Name: aws.String("Deployment"), Value: aws.String("testing")}, + }, + Value: aws.Float64(10), + }, + { + MetricName: aws.String("NumSheep"), + Dimensions: []types.Dimension{ + {Name: aws.String("Deployment"), Value: aws.String("testing")}, + {Name: aws.String("Host"), Value: aws.String("foo1")}, + }, + Value: aws.Float64(20), + }, + }, + }, svc.Prepare([]types.MetricDatum{ + {MetricName: aws.String("NumGoats"), Value: aws.Float64(10)}, + {MetricName: aws.String("NumSheep"), Dimensions: []types.Dimension{{Name: aws.String("Host"), Value: aws.String("foo1")}}, Value: aws.Float64(20)}, + })) + svc.Start() svc.Stop()