Skip to content

Commit

Permalink
Merge branch 'main' into jesse/split-nightlies
Browse files Browse the repository at this point in the history
  • Loading branch information
jessejlt authored Jan 7, 2025
2 parents 7d01b55 + 9577216 commit db151f5
Show file tree
Hide file tree
Showing 129 changed files with 14,756 additions and 12,672 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
"fmt"
"strings"

"github.com/Azure/adx-mon/alerter/alert"
"github.com/Azure/adx-mon/alerter/engine"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/table"
)

type multiKustoClient struct {
clients map[string]*kusto.Client
clients map[string]QueryClient
maxNotifications int
}

func New(endpoints map[string]string, configureAuth authConfiguror, max int) (multiKustoClient, error) {
clients := make(map[string]*kusto.Client)
clients := make(map[string]QueryClient)
for name, endpoint := range endpoints {
kcsb := kusto.NewConnectionStringBuilder(endpoint)
if strings.HasPrefix(endpoint, "https://") {
Expand Down Expand Up @@ -60,17 +60,14 @@ func (c multiKustoClient) Query(ctx context.Context, qc *engine.QueryContext, fn
if err := iter.Do(func(row *table.Row) error {
n++
if n > c.maxNotifications {
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(1)
return fmt.Errorf("%s/%s returned more than %d icm, throttling query", qc.Rule.Namespace, qc.Rule.Name, c.maxNotifications)
return fmt.Errorf("%s/%s returned more than %d icm, throttling query. %w", qc.Rule.Namespace, qc.Rule.Name, c.maxNotifications, alert.ErrTooManyRequests)
}

return fn(ctx, client.Endpoint(), qc, row)
}); err != nil {
return err, 0
}

// reset health metric since we didn't get any errors
metrics.NotificationUnhealthy.WithLabelValues(qc.Rule.Namespace, qc.Rule.Name).Set(0)
return nil, n
}

Expand All @@ -81,3 +78,9 @@ func (c multiKustoClient) Endpoint(db string) string {
}
return cl.Endpoint()
}

type QueryClient interface {
Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error)
Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
Endpoint() string
}
255 changes: 255 additions & 0 deletions alerter/multikustoclient/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package multikustoclient

import (
"context"
"errors"
"testing"

"github.com/Azure/adx-mon/alerter/engine"
"github.com/Azure/adx-mon/alerter/rules"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/table"
"github.com/Azure/azure-kusto-go/kusto/data/types"
"github.com/Azure/azure-kusto-go/kusto/data/value"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/stretchr/testify/require"
)

type fakeQueryClient struct {
nextQueryIter *kusto.RowIterator
nextQueryErr error

nextMgmtIter *kusto.RowIterator
nextMgmtErr error

endpoint string
}

func (f *fakeQueryClient) Query(ctx context.Context, db string, query kusto.Statement, options ...kusto.QueryOption) (*kusto.RowIterator, error) {
return f.nextQueryIter, f.nextQueryErr
}

func (f *fakeQueryClient) Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
return f.nextMgmtIter, f.nextMgmtErr
}

func (f *fakeQueryClient) Endpoint() string {
return f.endpoint
}

func TestQuery(t *testing.T) {
maxNotifications := 5

type testcase struct {
name string
rows *kusto.MockRows
rule *rules.Rule
queryErr error
callbackErr error
expectedSent int
expectError bool
}

testcases := []testcase{
{
name: "Query with no rows",
rows: newRows(t, []string{}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: nil,
callbackErr: nil,
expectedSent: 0,
expectError: false,
},
{
name: "Two rows",
rows: newRows(t, []string{
"rowOne",
"rowTwo",
}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: nil,
callbackErr: nil,
expectedSent: 2,
expectError: false,
},
{
name: "Max notifications",
rows: newRows(t, []string{
"rowOne",
"rowTwo",
"rowThree",
"rowFour",
"rowFive",
}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: nil,
callbackErr: nil,
expectedSent: 5,
expectError: false,
},
{
name: "Over max notifications",
rows: newRows(t, []string{
"rowOne",
"rowTwo",
"rowThree",
"rowFour",
"rowFive",
"rowSix",
}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: nil,
callbackErr: nil,
expectedSent: 5, // first 5 sent, then error
expectError: true,
},
{
name: "Unknown db",
rows: newRows(t, []string{}),
rule: &rules.Rule{
Database: "dbUnknown",
},
queryErr: nil,
callbackErr: nil,
expectedSent: 0,
expectError: true,
},
{
name: "Client query error",
rows: newRows(t, []string{}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: errors.New("query error"),
callbackErr: nil,
expectedSent: 0,
expectError: true,
},
{
name: "Callback error",
rows: newRows(t, []string{
"rowOne",
"rowTwo",
}),
rule: &rules.Rule{
Database: "dbOne",
},
queryErr: nil,
callbackErr: errors.New("callback error"),
expectedSent: 1, // still attempts to send first, bails out
expectError: true,
},
{
name: "Client mgmt query error",
rows: newRows(t, []string{}),
rule: &rules.Rule{
Database: "dbOne",
IsMgmtQuery: true,
},
queryErr: errors.New("query error"),
callbackErr: nil,
expectedSent: 0,
expectError: true,
},
{
name: "Query with no rows mgmt query",
rows: newRows(t, []string{}),
rule: &rules.Rule{
Database: "dbOne",
IsMgmtQuery: true,
},
queryErr: nil,
callbackErr: nil,
expectedSent: 0,
expectError: false,
},
{
name: "Two rows mgmt query",
rows: newRows(t, []string{
"rowOne",
"rowTwo",
}),
rule: &rules.Rule{
Database: "dbOne",
IsMgmtQuery: true,
},
queryErr: nil,
callbackErr: nil,
expectedSent: 2,
expectError: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
rowIterator := &kusto.RowIterator{}
err := rowIterator.Mock(tc.rows)
require.NoError(t, err)

var client QueryClient
if tc.rule.IsMgmtQuery {
client = &fakeQueryClient{
nextMgmtIter: rowIterator,
nextMgmtErr: tc.queryErr,
endpoint: "endpointOne",
}
} else {
client = &fakeQueryClient{
nextQueryIter: rowIterator,
nextQueryErr: tc.queryErr,
endpoint: "endpointOne",
}
}

multiKustoClient := multiKustoClient{
clients: map[string]QueryClient{
"dbOne": client,
},
maxNotifications: maxNotifications,
}

ctx := context.Background()
queryContext := &engine.QueryContext{
Rule: tc.rule,
Stmt: kusto.NewStmt(``, kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd("query"),
}

callbackCounter := 0
callback := func(context.Context, string, *engine.QueryContext, *table.Row) error {
callbackCounter++
return tc.callbackErr
}

err, _ = multiKustoClient.Query(ctx, queryContext, callback)

require.Equal(t, tc.expectedSent, callbackCounter)
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func newRows(t *testing.T, values []string) *kusto.MockRows {
t.Helper()

rows, err := kusto.NewMockRows(table.Columns{
{Name: "columnOne", Type: types.String},
})
require.NoError(t, err)
for _, val := range values {
err = rows.Row(value.Values{value.String{Value: val, Valid: true}})
require.NoError(t, err)
}
return rows
}
10 changes: 6 additions & 4 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,16 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
logger.Infof("Scraping %s", target.String())
iter, err := s.scrapeClient.FetchMetricsIterator(target.Addr)
if err != nil {
logger.Errorf("Failed to scrape %s: %s", target.Addr, err.Error())
logger.Warnf("Failed to create scrape iterator %s/%s/%s at %s: %s",
target.Namespace, target.Pod, target.Container, target.Addr, err.Error())
continue
}
for iter.Next() {
pt := prompb.TimeSeriesPool.Get()
ts, err := iter.TimeSeriesInto(pt)
if err != nil {
logger.Errorf("Failed to parse series %s: %s", target.Addr, err.Error())
logger.Warnf("Failed to parse series %s/%s/%s at %s: %s",
target.Namespace, target.Pod, target.Container, target.Addr, err.Error())
continue
}

Expand Down Expand Up @@ -261,11 +263,11 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
wr = s.flushBatchIfNecessary(ctx, wr)
}
if err := iter.Err(); err != nil {
logger.Errorf("Failed to scrape %s: %s", target.Addr, err.Error())
logger.Warnf("Failed to scrape %s/%s/%s at %s: %s", target.Namespace, target.Pod, target.Container, target.Addr, err.Error())
}

if err := iter.Close(); err != nil {
logger.Errorf("Failed to close iterator: %s", err.Error())
logger.Errorf("Failed to close iterator for %s/%s/%s: %s", target.Namespace, target.Pod, target.Container, err.Error())
}

wr = s.flushBatchIfNecessary(ctx, wr)
Expand Down
Loading

0 comments on commit db151f5

Please sign in to comment.