Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize allocations on pass of offsets to pipeline #727

Merged
merged 2 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Antispammer struct {
threshold int
maintenanceInterval time.Duration
mu sync.RWMutex
sources map[any]source
sources map[string]source
exceptions Exceptions

logger *zap.Logger
Expand Down Expand Up @@ -63,7 +63,7 @@ func NewAntispammer(o *Options) *Antispammer {
unbanIterations: o.UnbanIterations,
threshold: o.Threshold,
maintenanceInterval: o.MaintenanceInterval,
sources: make(map[any]source),
sources: make(map[string]source),
exceptions: o.Exceptions,
logger: o.Logger,
activeMetric: o.MetricsController.RegisterGauge("antispam_active",
Expand All @@ -82,7 +82,7 @@ func NewAntispammer(o *Options) *Antispammer {
return a
}

func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
if a.threshold <= 0 {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pipeline/antispam/antispammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAntispam(t *testing.T) {
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
}

for i := 1; i < threshold; i++ {
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestAntispamAfterRestart(t *testing.T) {
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
}

for i := 1; i < threshold; i++ {
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestAntispamExceptions(t *testing.T) {
antispamer.exceptions.Prepare()

checkSpam := func(source, event string, wantMetric map[string]float64) {
antispamer.IsSpam(1, source, true, []byte(event), now)
antispamer.IsSpam("1", source, true, []byte(event), now)
for k, v := range wantMetric {
r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k)))
}
Expand Down
21 changes: 21 additions & 0 deletions pipeline/offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pipeline

type Offsets struct {
current int64
streamOffsets SliceMap
}

func NewOffsets(current int64, streamOffsets SliceMap) Offsets {
return Offsets{
current: current,
streamOffsets: streamOffsets,
}
}

func (o Offsets) byStream(stream string) int64 {
offset, found := o.streamOffsets.Get(StreamName(stream))
if !found {
return -1
}
return offset
}
27 changes: 27 additions & 0 deletions pipeline/offsets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pipeline

import "testing"

func TestOffset(t *testing.T) {
offsets := SliceFromMap(map[StreamName]int64{
"stream1": 100,
"stream2": 200,
})

offset := NewOffsets(42, offsets)

// Test Current method
if got := offset.current; got != 42 {
t.Errorf("Current() = %v; want 42", got)
}

// Test ByStream method for existing stream
if got := offset.byStream("stream1"); got != 100 {
t.Errorf("ByStream('stream1') = %v; want 100", got)
}

// Test ByStream method for non-existing stream
if got := offset.byStream("stream3"); got != -1 {
t.Errorf("ByStream('stream3') = %v; want -1", got)
}
}
25 changes: 10 additions & 15 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
Expand Down Expand Up @@ -412,13 +412,8 @@ func (p *Pipeline) GetOutput() OutputPlugin {
return p.output
}

type Offsets interface {
Current() int64
ByStream(stream string) int64
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
var (
ok bool
cutoff bool
Expand Down Expand Up @@ -451,7 +446,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
row, err = decoder.DecodeCRI(bytes)
if err != nil {
p.wrongEventCRIFormatMetric.Inc()
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes))
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offsets.current, length, err.Error(), sourceID, sourceName, bytes))
return EventSeqIDError
}
}
Expand All @@ -465,17 +460,17 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
streamOffset := offset.ByStream(string(row.Stream))
currentOffset := offset.Current()
streamOffset := offsets.byStream(string(row.Stream))
currentOffset := offsets.current

if streamOffset > 0 && currentOffset < streamOffset {
return EventSeqIDError
}

var checkSourceID any
var checkSourceID string
var checkSourceName string
if p.settings.SourceNameMetaField == "" {
checkSourceID = uint64(sourceID)
checkSourceID = strconv.FormatUint(uint64(sourceID), 10)
checkSourceName = sourceName
} else {
if val, ok := meta[p.settings.SourceNameMetaField]; ok {
Expand All @@ -484,7 +479,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
isNewSource = false
} else {
p.Error(fmt.Sprintf("source_name_meta_field %q does not exists in meta", p.settings.SourceNameMetaField))
checkSourceID = uint64(sourceID)
checkSourceID = strconv.FormatUint(uint64(sourceID), 10)
checkSourceName = sourceName
}
}
Expand Down Expand Up @@ -539,7 +534,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
}

p.logger.Log(level, "wrong log format", zap.Error(err),
zap.Int64("offset", offset.Current()),
zap.Int64("offset", offsets.current),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand Down Expand Up @@ -570,7 +565,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
event.Root.AddFieldNoAlloc(event.Root, p.settings.CutOffEventByLimitField).MutateToBool(true)
}

event.Offset = offset.Current()
event.Offset = offsets.current
event.SourceID = sourceID
event.SourceName = sourceName
event.streamName = DefaultStreamName
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil)
seqID := pipe.In(tCase.sourceID, "kafka", test.NewOffset(tCase.offset), tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func BenchmarkMetaTemplater(b *testing.B) {
"/k8s-logs/advanced-logs-checker-1566485760-trtrq-%d_sre-%d_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0%d.log",
rest, rest, rest,
),
test.Offset(i),
test.NewOffset(int64(i)),
[]byte("2016-10-06T00:17:09.669794202Z stdout P partial content 1\n"),
false,
metadata.MetaData{},
Expand Down
39 changes: 39 additions & 0 deletions pipeline/slicemap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pipeline

// SliceMap is a map of streamName to offset.
// It could be just map[k]v, but Go > 1.17 internal map implementation can't
// work with mutable strings that occurs when using unsafe cast from []byte.
// Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job.
type SliceMap []kv

type kv struct {
Stream StreamName
Offset int64
}

func SliceFromMap(m map[StreamName]int64) SliceMap {
so := make(SliceMap, 0, len(m))
for k, v := range m {
so = append(so, kv{k, v})
}
return so
}

func (so *SliceMap) Get(streamName StreamName) (int64, bool) {
for _, kv := range *so {
if kv.Stream == streamName {
return kv.Offset, true
}
}
return 0, false
}

func (so *SliceMap) Set(streamName StreamName, offset int64) {
for i := range *so {
if (*so)[i].Stream == streamName {
(*so)[i].Offset = offset
return
}
}
*so = append(*so, kv{streamName, offset})
}
4 changes: 2 additions & 2 deletions plugin/action/add_file_name/add_file_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`))
input.In(0, sourceName, test.NewOffset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.NewOffset(0), []byte(`{"file":"not_my_file"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/add_host/add_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{}`))

wg.Wait()
p.Stop()
Expand Down
8 changes: 4 additions & 4 deletions plugin/action/convert_date/convert_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))

wg.Wait()
p.Stop()
Expand All @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"XXX"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/convert_log_level/convert_log_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestDo(t *testing.T) {
})

for _, log := range tc.In {
input.In(0, "test.log", test.Offset(0), []byte(log))
input.In(0, "test.log", test.NewOffset(0), []byte(log))
}

now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(tt.in))
input.In(0, "test.log", test.NewOffset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/decode/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), tt.input)
input.In(0, "test.log", test.NewOffset(0), tt.input)

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) {
})

for _, e := range tt.passEvents {
input.In(0, "test", test.Offset(0), []byte(e))
input.In(0, "test", test.NewOffset(0), []byte(e))
}
for _, e := range tt.discardEvents {
input.In(0, "test", test.Offset(0), []byte(e))
input.In(0, "test", test.NewOffset(0), []byte(e))
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/flatten/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join_template/join_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_decode/json_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_encode/json_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_extract/json_extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(tt.in))
input.In(0, "test.log", test.NewOffset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
Loading
Loading