Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transition metric writers away from accepting endpoint arg #505

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,20 @@ func realMain(ctx *cli.Context) error {

cfg.ReplaceVariable("$(HOSTNAME)", hostname)

var endpoints []string
var endpoint string
if cfg.Endpoint != "" {
for _, endpoint := range []string{cfg.Endpoint} {
u, err := url.Parse(endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err)
}
endpoint = cfg.Endpoint

if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint %s must be http or https", endpoint)
}
u, err := url.Parse(cfg.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err)
}

logger.Infof("Using remote write endpoint %s", endpoint)
endpoints = append(endpoints, endpoint)
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint %s must be http or https", endpoint)
}

logger.Infof("Using remote write endpoint %s", endpoint)
}

if cfg.StorageDir == "" {
Expand Down Expand Up @@ -298,7 +297,7 @@ func realMain(ctx *cli.Context) error {
Scraper: scraperOpts,
ListenAddr: cfg.ListenAddr,
NodeName: hostname,
Endpoints: endpoints,
Endpoint: endpoint,
LiftLabels: sortedLiftedLabels,
AddAttributes: addAttributes,
LiftAttributes: liftAttributes,
Expand Down
5 changes: 3 additions & 2 deletions collector/otlp/logs_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
type LogsProxyServiceOpts struct {
AddAttributes map[string]string
LiftAttributes []string
Endpoints []string
Endpoint string
InsecureSkipVerify bool
}

Expand Down Expand Up @@ -60,7 +60,8 @@ func NewLogsProxyService(opts LogsProxyServiceOpts) *LogsProxyService {
}

rpcClients := make(map[string]logsv1connect.LogsServiceClient)
for _, endpoint := range opts.Endpoints {
if opts.Endpoint != "" {
endpoint := opts.Endpoint
// We have to strip the path component from our endpoint so gRPC can correctly setup its routing
uri, err := url.Parse(endpoint)
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions collector/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,20 @@ type OltpMetricWriterOpts struct {
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int

Endpoints []string

Client remote.RemoteWriteClient
Clients []remote.RemoteWriteClient
}

type OltpMetricWriter struct {
requestTransformer *transform.RequestTransformer
endpoints []string
remoteClient remote.RemoteWriteClient
remoteClients []remote.RemoteWriteClient
maxBatchSize int
disableMetricsForwarding bool
}

func NewOltpMetricWriter(opts OltpMetricWriterOpts) *OltpMetricWriter {
return &OltpMetricWriter{
requestTransformer: opts.RequestTransformer,
endpoints: opts.Endpoints,
remoteClient: opts.Client,
remoteClients: opts.Clients,
maxBatchSize: opts.MaxBatchSize,
disableMetricsForwarding: opts.DisableMetricsForwarding,
}
Expand Down Expand Up @@ -187,7 +183,7 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques
return nil
}

if len(t.endpoints) == 0 || logger.IsDebug() {
if len(t.remoteClients) == 0 || logger.IsDebug() {
var sb strings.Builder
for _, ts := range wr.Timeseries {
sb.Reset()
Expand All @@ -212,14 +208,14 @@ func (t *OltpMetricWriter) sendBatch(ctx context.Context, wr *prompb.WriteReques

start := time.Now()
defer func() {
logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.endpoints), time.Since(start))
logger.Infof("OLTP Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(t.remoteClients), time.Since(start))
}()

g, gCtx := errgroup.WithContext(ctx)
for _, endpoint := range t.endpoints {
endpoint := endpoint
for _, remoteClient := range t.remoteClients {
remoteClient := remoteClient
g.Go(func() error {
return t.remoteClient.Write(gCtx, endpoint, wr)
return remoteClient.Write(gCtx, wr)
})
}
if err := g.Wait(); err != nil {
Expand Down
21 changes: 10 additions & 11 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ type ScraperOpts struct {
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int

Endpoints []string

RemoteClient remote.RemoteWriteClient
RemoteClients []remote.RemoteWriteClient
}

func (s *ScraperOpts) RequestTransformer() *transform.RequestTransformer {
Expand Down Expand Up @@ -110,7 +108,7 @@ type Scraper struct {
informerRegistration cache.ResourceEventHandlerRegistration

requestTransformer *transform.RequestTransformer
remoteClient remote.RemoteWriteClient
remoteClients []remote.RemoteWriteClient
scrapeClient *MetricsClient
seriesCreator *seriesCreator

Expand All @@ -127,7 +125,7 @@ func NewScraper(opts *ScraperOpts) *Scraper {
opts: *opts,
seriesCreator: &seriesCreator{},
requestTransformer: opts.RequestTransformer(),
remoteClient: opts.RemoteClient,
remoteClients: opts.RemoteClients,
targets: make(map[string]ScrapeTarget),
}
}
Expand Down Expand Up @@ -201,7 +199,9 @@ func (s *Scraper) scrape(ctx context.Context) {
case <-t.C:
s.scrapeTargets(ctx)
case <-reconnectTimer.C:
s.remoteClient.CloseIdleConnections()
for _, remoteClient := range s.remoteClients {
remoteClient.CloseIdleConnections()
}
}
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error
return nil
}

if len(s.opts.Endpoints) == 0 || logger.IsDebug() {
if len(s.remoteClients) == 0 || logger.IsDebug() {
var sb strings.Builder
for _, ts := range wr.Timeseries {
sb.Reset()
Expand All @@ -325,14 +325,13 @@ func (s *Scraper) sendBatch(ctx context.Context, wr *prompb.WriteRequest) error

start := time.Now()
defer func() {
logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.opts.Endpoints), time.Since(start))
logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(s.remoteClients), time.Since(start))
}()

g, gCtx := errgroup.WithContext(ctx)
for _, endpoint := range s.opts.Endpoints {
endpoint := endpoint
for _, remoteClient := range s.remoteClients {
g.Go(func() error {
return s.remoteClient.Write(gCtx, endpoint, wr)
return remoteClient.Write(gCtx, wr)
})
}
return g.Wait()
Expand Down
16 changes: 7 additions & 9 deletions collector/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -29,15 +30,13 @@ func TestScraper_sendBatch(t *testing.T) {
name: "TestEmptyWriteRequest",
writeRequest: &prompb.WriteRequest{},
opts: &ScraperOpts{
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 0},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}},
},
},
{
name: "TestValidWriteRequest",
opts: &ScraperOpts{
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 1},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 1}},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Expand All @@ -56,8 +55,7 @@ func TestScraper_sendBatch(t *testing.T) {
name: "TestDefaultDropMetrics",
opts: &ScraperOpts{
DefaultDropMetrics: true,
Endpoints: []string{"http://fake:1234"},
RemoteClient: &fakeClient{expectedSamples: 0},
RemoteClients: []remote.RemoteWriteClient{&fakeClient{expectedSamples: 0}},
},
writeRequest: &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
Expand All @@ -80,8 +78,8 @@ func TestScraper_sendBatch(t *testing.T) {
wr := s.flushBatchIfNecessary(context.Background(), tt.writeRequest)
err := s.sendBatch(context.Background(), wr)
require.NoError(t, err)
if tt.opts.RemoteClient.(*fakeClient).expectedSamples > 0 {
require.True(t, tt.opts.RemoteClient.(*fakeClient).called)
if tt.opts.RemoteClients[0].(*fakeClient).expectedSamples > 0 {
require.True(t, tt.opts.RemoteClients[0].(*fakeClient).called)
}
})
}
Expand Down Expand Up @@ -143,7 +141,7 @@ type fakeClient struct {
called bool
}

func (f *fakeClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (f *fakeClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
f.called = true
if len(wr.Timeseries) != f.expectedSamples {
return fmt.Errorf("expected %d samples, got %d", f.expectedSamples, len(wr.Timeseries))
Expand Down
17 changes: 8 additions & 9 deletions collector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Azure/adx-mon/pkg/http"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/storage"
"github.com/Azure/adx-mon/transform"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Service struct {
type ServiceOpts struct {
ListenAddr string
NodeName string
Endpoints []string
Endpoint string

// LogCollectionHandlers is the list of log collection handlers
LogCollectionHandlers []LogCollectorOpts
Expand Down Expand Up @@ -197,7 +198,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
logsProxySvc := otlp.NewLogsProxyService(otlp.LogsProxyServiceOpts{
LiftAttributes: opts.LiftAttributes,
AddAttributes: opts.AddAttributes,
Endpoints: opts.Endpoints,
Endpoint: opts.Endpoint,
InsecureSkipVerify: opts.InsecureSkipVerify,
})

Expand All @@ -220,8 +221,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
for _, handlerOpts := range opts.OtlpMetricsHandlers {
writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
Client: &StoreRemoteClient{store},
Endpoints: opts.Endpoints,
Clients: []remote.RemoteWriteClient{&StoreRemoteClient{store}},
MaxBatchSize: opts.MaxBatchSize,
DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
})
Expand Down Expand Up @@ -249,11 +249,11 @@ func NewService(opts *ServiceOpts) (*Service, error) {
transferQueue chan *cluster.Batch
partitioner cluster.MetricPartitioner
)
if len(opts.Endpoints) > 0 {
if opts.Endpoint != "" {
// This is a static partitioner that forces all entries to be assigned to the remote endpoint.
partitioner = remotePartitioner{
host: "remote",
addr: opts.Endpoints[0],
addr: opts.Endpoint,
}

r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Expand Down Expand Up @@ -293,8 +293,7 @@ func NewService(opts *ServiceOpts) (*Service, error) {
var scraper *Scraper
if opts.Scraper != nil {
scraperOpts := opts.Scraper
scraperOpts.RemoteClient = &StoreRemoteClient{store}
scraperOpts.Endpoints = opts.Endpoints
scraperOpts.RemoteClients = []remote.RemoteWriteClient{&StoreRemoteClient{store}}

scraper = NewScraper(opts.Scraper)
}
Expand Down Expand Up @@ -524,7 +523,7 @@ type StoreRemoteClient struct {
store storage.Store
}

func (s *StoreRemoteClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (s *StoreRemoteClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return s.store.WriteTimeSeries(ctx, wr.Timeseries)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/promremote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// Client is a client for the prometheus remote write API. It is safe to be shared between goroutines.
type Client struct {
httpClient *http.Client
endpoint string
opts ClientOpts
}

Expand Down Expand Up @@ -64,6 +65,9 @@ type ClientOpts struct {

// DisableKeepAlives controls whether the client disables HTTP keep-alives.
DisableKeepAlives bool

// Endpoint for writing to the prometheus remote write API.
Endpoint string
}

func (c ClientOpts) WithDefaults() ClientOpts {
Expand Down Expand Up @@ -112,6 +116,7 @@ func NewClient(opts ClientOpts) (*Client, error) {

return &Client{
httpClient: httpClient,
endpoint: opts.Endpoint,
opts: opts,
}, nil
}
Expand All @@ -131,7 +136,7 @@ func (c *Client) Write(ctx context.Context, endpoint string, wr *prompb.WriteReq
encoded := snappy.Encode(b1[:0], b)
body := bytes.NewReader(encoded)

req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", endpoint), body)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", c.endpoint), body)
if err != nil {
return fmt.Errorf("new request: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/promremote/promremote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"time"

"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/stretchr/testify/require"
)

func TestSendBatchWithValidData(t *testing.T) {
client := &MockClient{}
proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 10, false)
proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 10, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -41,7 +42,7 @@ func TestSendBatchWithValidData(t *testing.T) {

func TestSendBatchWithEmptyBatch(t *testing.T) {
client := &MockClient{}
proxy := NewRemoteWriteProxy(client, []string{"http://example.com"}, 1, false)
proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 1, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -76,6 +77,6 @@ type MockClient struct{}

func (m *MockClient) CloseIdleConnections() {}

func (m *MockClient) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error {
func (m *MockClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return nil
}
Loading
Loading