From 9553c99f59a38613f788c488585445a4423f6608 Mon Sep 17 00:00:00 2001 From: Mike Keesey Date: Tue, 7 Jan 2025 07:53:06 -0700 Subject: [PATCH] Transition metric writers away from accepting endpoint arg The writers we have used have de-facto only written to one endpoint. Some implementations such as the storage writer only ever supported one endpoint and ignored the endpoint argument provided to the Write method. This split was confusing and error prone. Future work involves adding alternate forwarders which will use potentially different protocols than our normal collector to ingestor flow. Instead of metric collectors having one instance of a writer and multiple endpoints configured, these metric writers now have a collection of writers to send their data to. --- cmd/collector/main.go | 23 +++++++++++------------ collector/otlp/logs_proxy.go | 5 +++-- collector/otlp/metrics.go | 20 ++++++++------------ collector/scraper.go | 21 ++++++++++----------- collector/scraper_test.go | 16 +++++++--------- collector/service.go | 17 ++++++++--------- pkg/promremote/client.go | 7 ++++++- pkg/promremote/promremote_test.go | 7 ++++--- pkg/promremote/proxy.go | 18 ++++++++---------- pkg/remote/types.go | 2 +- tools/cmd/write-load/main.go | 1 + 11 files changed, 67 insertions(+), 70 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 1c1e7c054..14f8e322e 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -120,21 +120,20 @@ func realMain(ctx *cli.Context) error { cfg.ReplaceVariable("$(HOSTNAME)", hostname) - var endpoints []string + var endpoint string if cfg.Endpoint != "" { - for _, endpoint := range []string{cfg.Endpoint} { - u, err := url.Parse(endpoint) - if err != nil { - return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err) - } + endpoint = cfg.Endpoint - if u.Scheme != "http" && u.Scheme != "https" { - return fmt.Errorf("endpoint %s must be http or https", endpoint) - } + u, err := url.Parse(cfg.Endpoint) + if err != nil { + return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err) + } - logger.Infof("Using remote write endpoint %s", endpoint) - endpoints = append(endpoints, endpoint) + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("endpoint %s must be http or https", endpoint) } + + logger.Infof("Using remote write endpoint %s", endpoint) } if cfg.StorageDir == "" { @@ -298,7 +297,7 @@ func realMain(ctx *cli.Context) error { Scraper: scraperOpts, ListenAddr: cfg.ListenAddr, NodeName: hostname, - Endpoints: endpoints, + Endpoint: endpoint, LiftLabels: sortedLiftedLabels, AddAttributes: addAttributes, LiftAttributes: liftAttributes, diff --git a/collector/otlp/logs_proxy.go b/collector/otlp/logs_proxy.go index 11dcfec49..4e8c0b043 100644 --- a/collector/otlp/logs_proxy.go +++ b/collector/otlp/logs_proxy.go @@ -30,7 +30,7 @@ import ( type LogsProxyServiceOpts struct { AddAttributes map[string]string LiftAttributes []string - Endpoints []string + Endpoint string InsecureSkipVerify bool } @@ -60,7 +60,8 @@ func NewLogsProxyService(opts LogsProxyServiceOpts) *LogsProxyService { } rpcClients := make(map[string]logsv1connect.LogsServiceClient) - for _, endpoint := range opts.Endpoints { + if opts.Endpoint != "" { + endpoint := opts.Endpoint // We have to strip the path component from our endpoint so gRPC can correctly setup its routing uri, err := url.Parse(endpoint) if err != nil { diff --git a/collector/otlp/metrics.go b/collector/otlp/metrics.go index 9d9821f4d..98c3529e2 100644 --- a/collector/otlp/metrics.go +++ b/collector/otlp/metrics.go @@ -86,15 +86,12 @@ type OltpMetricWriterOpts struct { // MaxBatchSize is the maximum number of samples to send in a single batch. MaxBatchSize int - Endpoints []string - - Client remote.RemoteWriteClient + Clients []remote.RemoteWriteClient } type OltpMetricWriter struct { requestTransformer *transform.RequestTransformer - endpoints []string - remoteClient remote.RemoteWriteClient + remoteClients []remote.RemoteWriteClient maxBatchSize int disableMetricsForwarding bool } @@ -102,8 +99,7 @@ type OltpMetricWriter struct { func NewOltpMetricWriter(opts OltpMetricWriterOpts) *OltpMetricWriter { return &OltpMetricWriter{ requestTransformer: opts.RequestTransformer, - endpoints: opts.Endpoints, - remoteClient: opts.Client, + remoteClients: opts.Clients, maxBatchSize: opts.MaxBatchSize, disableMetricsForwarding: opts.DisableMetricsForwarding, } @@ -187,7 +183,7 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques return nil } - if len(t.endpoints) == 0 || logger.IsDebug() { + if len(t.remoteClients) == 0 || logger.IsDebug() { var sb strings.Builder for _, ts := range wr.Timeseries { sb.Reset() @@ -212,14 +208,14 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques start := time.Now() defer func() { - logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.endpoints), time.Since(start)) + logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.remoteClients), time.Since(start)) }() g, gCtx := errgroup.WithContext(ctx) - for _, endpoint := range t.endpoints { - endpoint := endpoint + for _, remoteClient := range t.remoteClients { + remoteClient := remoteClient g.Go(func() error { - return t.remoteClient.Write(gCtx, endpoint, wr) + return remoteClient.Write(gCtx, wr) }) } if err := g.Wait(); err != nil { diff --git a/collector/scraper.go b/collector/scraper.go index 8e3b05eac..2075b953b 100644 --- a/collector/scraper.go +++ b/collector/scraper.go @@ -64,9 +64,7 @@ type ScraperOpts struct { // MaxBatchSize is the maximum number of samples to send in a single batch. MaxBatchSize int - Endpoints []string - - RemoteClient remote.RemoteWriteClient + RemoteClients []remote.RemoteWriteClient } func (s *ScraperOpts) RequestTransformer() *transform.RequestTransformer { @@ -110,7 +108,7 @@ type Scraper struct { informerRegistration cache.ResourceEventHandlerRegistration requestTransformer *transform.RequestTransformer - remoteClient remote.RemoteWriteClient + remoteClients []remote.RemoteWriteClient scrapeClient *MetricsClient seriesCreator *seriesCreator @@ -127,7 +125,7 @@ func NewScraper(opts *ScraperOpts) *Scraper { opts: *opts, seriesCreator: &seriesCreator{}, requestTransformer: opts.RequestTransformer(), - remoteClient: opts.RemoteClient, + remoteClients: opts.RemoteClients, targets: make(map[string]ScrapeTarget), } } @@ -201,7 +199,9 @@ func (s *Scraper) scrape(ctx context.Context) { case <-t.C: s.scrapeTargets(ctx) case <-reconnectTimer.C: - s.remoteClient.CloseIdleConnections() + for _, remoteClient := range s.remoteClients { + remoteClient.CloseIdleConnections() + } } } } @@ -303,7 +303,7 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error return nil } - if len(s.opts.Endpoints) == 0 || logger.IsDebug() { + if len(s.remoteClients) == 0 || logger.IsDebug() { var sb strings.Builder for _, ts := range wr.Timeseries { sb.Reset() @@ -325,14 +325,13 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error start := time.Now() defer func() { - logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.opts.Endpoints), time.Since(start)) + logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.remoteClients), time.Since(start)) }() g, gCtx := errgroup.WithContext(ctx) - for _, endpoint := range s.opts.Endpoints { - endpoint := endpoint + for _, remoteClient := range s.remoteClients { g.Go(func() error { - return s.remoteClient.Write(gCtx, endpoint, wr) + return remoteClient.Write(gCtx, wr) }) } return g.Wait() diff --git a/collector/scraper_test.go b/collector/scraper_test.go index b1c70868d..f96f0af56 100644 --- a/collector/scraper_test.go +++ b/collector/scraper_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/Azure/adx-mon/pkg/prompb" + "github.com/Azure/adx-mon/pkg/remote" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" ) @@ -29,15 +30,13 @@ func TestScraper_sendBatch(t *testing.T) { name: "TestEmptyWriteRequest", writeRequest: &prompb.WriteRequest{}, opts: &ScraperOpts{ - Endpoints: []string{"http://fake:1234"}, - RemoteClient: &fakeClient{expectedSamples: 0}, + RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}}, }, }, { name: "TestValidWriteRequest", opts: &ScraperOpts{ - Endpoints: []string{"http://fake:1234"}, - RemoteClient: &fakeClient{expectedSamples: 1}, + RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 1}}, }, writeRequest: &prompb.WriteRequest{ Timeseries: []*prompb.TimeSeries{ @@ -56,8 +55,7 @@ func TestScraper_sendBatch(t *testing.T) { name: "TestDefaultDropMetrics", opts: &ScraperOpts{ DefaultDropMetrics: true, - Endpoints: []string{"http://fake:1234"}, - RemoteClient: &fakeClient{expectedSamples: 0}, + RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}}, }, writeRequest: &prompb.WriteRequest{ Timeseries: []*prompb.TimeSeries{ @@ -80,8 +78,8 @@ func TestScraper_sendBatch(t *testing.T) { wr := s.flushBatchIfNecessary(context.Background(), tt.writeRequest) err := s.sendBatch(context.Background(), wr) require.NoError(t, err) - if tt.opts.RemoteClient.(*fakeClient).expectedSamples > 0 { - require.True(t, tt.opts.RemoteClient.(*fakeClient).called) + if tt.opts.RemoteClients[0].(*fakeClient).expectedSamples > 0 { + require.True(t, tt.opts.RemoteClients[0].(*fakeClient).called) } }) } @@ -143,7 +141,7 @@ type fakeClient struct { called bool } -func (f *fakeClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error { +func (f *fakeClient) Write(ctx context.Context, wr *prompb.WriteRequest) error { f.called = true if len(wr.Timeseries) != f.expectedSamples { return fmt.Errorf("expected %d samples, got %d", f.expectedSamples, len(wr.Timeseries)) diff --git a/collector/service.go b/collector/service.go index d9d6a49f8..34e163889 100644 --- a/collector/service.go +++ b/collector/service.go @@ -18,6 +18,7 @@ import ( "github.com/Azure/adx-mon/pkg/http" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/prompb" + "github.com/Azure/adx-mon/pkg/remote" "github.com/Azure/adx-mon/pkg/service" "github.com/Azure/adx-mon/storage" "github.com/Azure/adx-mon/transform" @@ -68,7 +69,7 @@ type Service struct { type ServiceOpts struct { ListenAddr string NodeName string - Endpoints []string + Endpoint string // LogCollectionHandlers is the list of log collection handlers LogCollectionHandlers []LogCollectorOpts @@ -197,7 +198,7 @@ func NewService(opts *ServiceOpts) (*Service, error) { logsProxySvc := otlp.NewLogsProxyService(otlp.LogsProxyServiceOpts{ LiftAttributes: opts.LiftAttributes, AddAttributes: opts.AddAttributes, - Endpoints: opts.Endpoints, + Endpoint: opts.Endpoint, InsecureSkipVerify: opts.InsecureSkipVerify, }) @@ -220,8 +221,7 @@ func NewService(opts *ServiceOpts) (*Service, error) { for _, handlerOpts := range opts.OtlpMetricsHandlers { writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{ RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(), - Client: &StoreRemoteClient{store}, - Endpoints: opts.Endpoints, + Clients: []remote.RemoteWriteClient{&StoreRemoteClient{store}}, MaxBatchSize: opts.MaxBatchSize, DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding, }) @@ -249,11 +249,11 @@ func NewService(opts *ServiceOpts) (*Service, error) { transferQueue chan *cluster.Batch partitioner cluster.MetricPartitioner ) - if len(opts.Endpoints) > 0 { + if opts.Endpoint != "" { // This is a static partitioner that forces all entries to be assigned to the remote endpoint. partitioner = remotePartitioner{ host: "remote", - addr: opts.Endpoints[0], + addr: opts.Endpoint, } r, err := cluster.NewReplicator(cluster.ReplicatorOpts{ @@ -293,8 +293,7 @@ func NewService(opts *ServiceOpts) (*Service, error) { var scraper *Scraper if opts.Scraper != nil { scraperOpts := opts.Scraper - scraperOpts.RemoteClient = &StoreRemoteClient{store} - scraperOpts.Endpoints = opts.Endpoints + scraperOpts.RemoteClients = []remote.RemoteWriteClient{&StoreRemoteClient{store}} scraper = NewScraper(opts.Scraper) } @@ -524,7 +523,7 @@ type StoreRemoteClient struct { store storage.Store } -func (s *StoreRemoteClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error { +func (s *StoreRemoteClient) Write(ctx context.Context, wr *prompb.WriteRequest) error { return s.store.WriteTimeSeries(ctx, wr.Timeseries) } diff --git a/pkg/promremote/client.go b/pkg/promremote/client.go index 576f97def..04849eaf6 100644 --- a/pkg/promremote/client.go +++ b/pkg/promremote/client.go @@ -25,6 +25,7 @@ var ( // Client is a client for the prometheus remote write API. It is safe to be shared between goroutines. type Client struct { httpClient *http.Client + endpoint string opts ClientOpts } @@ -64,6 +65,9 @@ type ClientOpts struct { // DisableKeepAlives controls whether the client disables HTTP keep-alives. DisableKeepAlives bool + + // Endpoint for writing to the prometheus remote write API. + Endpoint string } func (c ClientOpts) WithDefaults() ClientOpts { @@ -112,6 +116,7 @@ func NewClient(opts ClientOpts) (*Client, error) { return &Client{ httpClient: httpClient, + endpoint: opts.Endpoint, opts: opts, }, nil } @@ -131,7 +136,7 @@ func (c *Client) Write(ctx context.Context, endpoint string, wr *prompb.WriteReq encoded := snappy.Encode(b1[:0], b) body := bytes.NewReader(encoded) - req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", endpoint), body) + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", c.endpoint), body) if err != nil { return fmt.Errorf("new request: %w", err) } diff --git a/pkg/promremote/promremote_test.go b/pkg/promremote/promremote_test.go index d0b33167d..b01ec65c7 100644 --- a/pkg/promremote/promremote_test.go +++ b/pkg/promremote/promremote_test.go @@ -6,12 +6,13 @@ import ( "time" "github.com/Azure/adx-mon/pkg/prompb" + "github.com/Azure/adx-mon/pkg/remote" "github.com/stretchr/testify/require" ) func TestSendBatchWithValidData(t *testing.T) { client := &MockClient{} - proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 10, false) + proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 10, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -41,7 +42,7 @@ func TestSendBatchWithValidData(t *testing.T) { func TestSendBatchWithEmptyBatch(t *testing.T) { client := &MockClient{} - proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 1, false) + proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 1, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -76,6 +77,6 @@ type MockClient struct{} func (m *MockClient) CloseIdleConnections() {} -func (m *MockClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error { +func (m *MockClient) Write(ctx context.Context, wr *prompb.WriteRequest) error { return nil } diff --git a/pkg/promremote/proxy.go b/pkg/promremote/proxy.go index 86042b463..6739182b7 100644 --- a/pkg/promremote/proxy.go +++ b/pkg/promremote/proxy.go @@ -15,8 +15,7 @@ import ( ) type RemoteWriteProxy struct { - client remote.RemoteWriteClient - endpoints []string + clients []remote.RemoteWriteClient maxBatchSize int disableMetricsForwarding bool @@ -28,10 +27,9 @@ type RemoteWriteProxy struct { cancelFn context.CancelFunc } -func NewRemoteWriteProxy(client remote.RemoteWriteClient, endpoints []string, maxBatchSize int, disableMetricsForwarding bool) *RemoteWriteProxy { +func NewRemoteWriteProxy(clients []remote.RemoteWriteClient, maxBatchSize int, disableMetricsForwarding bool) *RemoteWriteProxy { p := &RemoteWriteProxy{ - client: client, - endpoints: endpoints, + clients: clients, maxBatchSize: maxBatchSize, disableMetricsForwarding: disableMetricsForwarding, queue: make(chan *prompb.WriteRequest, 100), @@ -138,7 +136,7 @@ func (p *RemoteWriteProxy) sendBatch(ctx context.Context) error { return bytes.Compare(prompb.MetricName(wr.Timeseries[i]), prompb.MetricName(wr.Timeseries[j])) < 0 }) - if len(p.endpoints) == 0 || logger.IsDebug() { + if len(p.clients) == 0 || logger.IsDebug() { var sb strings.Builder for _, ts := range wr.Timeseries { sb.Reset() @@ -160,13 +158,13 @@ func (p *RemoteWriteProxy) sendBatch(ctx context.Context) error { start := time.Now() g, gCtx := errgroup.WithContext(ctx) - for _, endpoint := range p.endpoints { - endpoint := endpoint + for _, client := range p.clients { + client := client g.Go(func() error { - return p.client.Write(gCtx, endpoint, wr) + return client.Write(gCtx, wr) }) } - logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(p.endpoints), time.Since(start)) + logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(p.clients), time.Since(start)) if err := g.Wait(); err != nil { logger.Errorf("Error sending batch: %v", err) } diff --git a/pkg/remote/types.go b/pkg/remote/types.go index 7525bfb04..094faf228 100644 --- a/pkg/remote/types.go +++ b/pkg/remote/types.go @@ -7,6 +7,6 @@ import ( ) type RemoteWriteClient interface { - Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error + Write(ctx context.Context, wr *prompb.WriteRequest) error CloseIdleConnections() } diff --git a/tools/cmd/write-load/main.go b/tools/cmd/write-load/main.go index fe48ffc27..aaca3dc20 100644 --- a/tools/cmd/write-load/main.go +++ b/tools/cmd/write-load/main.go @@ -145,6 +145,7 @@ func writer(ctx context.Context, endpoint string, stats *stats, ch chan *prompb. cli, err := promremote.NewClient(promremote.ClientOpts{ InsecureSkipVerify: true, Timeout: 30 * time.Second, + Endpoint: endpoint, }) if err != nil { logger.Fatalf("prom client: %v", err)