Skip to content

Commit

Permalink
optimize allocations on pass of offsets to pipeline (#727)
Browse files Browse the repository at this point in the history
* memory optimize pass of offsets to pipeline

* optimize allocations on source check spam
  • Loading branch information
DmitryRomanov authored Dec 24, 2024
1 parent 28be3e4 commit 6a1c63c
Show file tree
Hide file tree
Showing 45 changed files with 197 additions and 242 deletions.
6 changes: 3 additions & 3 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Antispammer struct {
threshold int
maintenanceInterval time.Duration
mu sync.RWMutex
sources map[any]source
sources map[string]source
exceptions Exceptions

logger *zap.Logger
Expand Down Expand Up @@ -63,7 +63,7 @@ func NewAntispammer(o *Options) *Antispammer {
unbanIterations: o.UnbanIterations,
threshold: o.Threshold,
maintenanceInterval: o.MaintenanceInterval,
sources: make(map[any]source),
sources: make(map[string]source),
exceptions: o.Exceptions,
logger: o.Logger,
activeMetric: o.MetricsController.RegisterGauge("antispam_active",
Expand All @@ -82,7 +82,7 @@ func NewAntispammer(o *Options) *Antispammer {
return a
}

func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
if a.threshold <= 0 {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pipeline/antispam/antispammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAntispam(t *testing.T) {
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
}

for i := 1; i < threshold; i++ {
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestAntispamAfterRestart(t *testing.T) {
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
}

for i := 1; i < threshold; i++ {
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestAntispamExceptions(t *testing.T) {
antispamer.exceptions.Prepare()

checkSpam := func(source, event string, wantMetric map[string]float64) {
antispamer.IsSpam(1, source, true, []byte(event), now)
antispamer.IsSpam("1", source, true, []byte(event), now)
for k, v := range wantMetric {
r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k)))
}
Expand Down
21 changes: 21 additions & 0 deletions pipeline/offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pipeline

type Offsets struct {
current int64
streamOffsets SliceMap
}

func NewOffsets(current int64, streamOffsets SliceMap) Offsets {
return Offsets{
current: current,
streamOffsets: streamOffsets,
}
}

func (o Offsets) byStream(stream string) int64 {
offset, found := o.streamOffsets.Get(StreamName(stream))
if !found {
return -1
}
return offset
}
27 changes: 27 additions & 0 deletions pipeline/offsets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pipeline

import "testing"

func TestOffset(t *testing.T) {
offsets := SliceFromMap(map[StreamName]int64{
"stream1": 100,
"stream2": 200,
})

offset := NewOffsets(42, offsets)

// Test Current method
if got := offset.current; got != 42 {
t.Errorf("Current() = %v; want 42", got)
}

// Test ByStream method for existing stream
if got := offset.byStream("stream1"); got != 100 {
t.Errorf("ByStream('stream1') = %v; want 100", got)
}

// Test ByStream method for non-existing stream
if got := offset.byStream("stream3"); got != -1 {
t.Errorf("ByStream('stream3') = %v; want -1", got)
}
}
25 changes: 10 additions & 15 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
Expand Down Expand Up @@ -412,13 +412,8 @@ func (p *Pipeline) GetOutput() OutputPlugin {
return p.output
}

type Offsets interface {
Current() int64
ByStream(stream string) int64
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
var (
ok bool
cutoff bool
Expand Down Expand Up @@ -451,7 +446,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
row, err = decoder.DecodeCRI(bytes)
if err != nil {
p.wrongEventCRIFormatMetric.Inc()
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes))
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offsets.current, length, err.Error(), sourceID, sourceName, bytes))
return EventSeqIDError
}
}
Expand All @@ -465,17 +460,17 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
streamOffset := offset.ByStream(string(row.Stream))
currentOffset := offset.Current()
streamOffset := offsets.byStream(string(row.Stream))
currentOffset := offsets.current

if streamOffset > 0 && currentOffset < streamOffset {
return EventSeqIDError
}

var checkSourceID any
var checkSourceID string
var checkSourceName string
if p.settings.SourceNameMetaField == "" {
checkSourceID = uint64(sourceID)
checkSourceID = strconv.FormatUint(uint64(sourceID), 10)
checkSourceName = sourceName
} else {
if val, ok := meta[p.settings.SourceNameMetaField]; ok {
Expand All @@ -484,7 +479,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
isNewSource = false
} else {
p.Error(fmt.Sprintf("source_name_meta_field %q does not exists in meta", p.settings.SourceNameMetaField))
checkSourceID = uint64(sourceID)
checkSourceID = strconv.FormatUint(uint64(sourceID), 10)
checkSourceName = sourceName
}
}
Expand Down Expand Up @@ -539,7 +534,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
}

p.logger.Log(level, "wrong log format", zap.Error(err),
zap.Int64("offset", offset.Current()),
zap.Int64("offset", offsets.current),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand Down Expand Up @@ -570,7 +565,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
event.Root.AddFieldNoAlloc(event.Root, p.settings.CutOffEventByLimitField).MutateToBool(true)
}

event.Offset = offset.Current()
event.Offset = offsets.current
event.SourceID = sourceID
event.SourceName = sourceName
event.streamName = DefaultStreamName
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil)
seqID := pipe.In(tCase.sourceID, "kafka", test.NewOffset(tCase.offset), tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func BenchmarkMetaTemplater(b *testing.B) {
"/k8s-logs/advanced-logs-checker-1566485760-trtrq-%d_sre-%d_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0%d.log",
rest, rest, rest,
),
test.Offset(i),
test.NewOffset(int64(i)),
[]byte("2016-10-06T00:17:09.669794202Z stdout P partial content 1\n"),
false,
metadata.MetaData{},
Expand Down
39 changes: 39 additions & 0 deletions pipeline/slicemap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pipeline

// SliceMap is a map of streamName to offset.
// It could be just map[k]v, but Go > 1.17 internal map implementation can't
// work with mutable strings that occurs when using unsafe cast from []byte.
// Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job.
type SliceMap []kv

type kv struct {
Stream StreamName
Offset int64
}

func SliceFromMap(m map[StreamName]int64) SliceMap {
so := make(SliceMap, 0, len(m))
for k, v := range m {
so = append(so, kv{k, v})
}
return so
}

func (so *SliceMap) Get(streamName StreamName) (int64, bool) {
for _, kv := range *so {
if kv.Stream == streamName {
return kv.Offset, true
}
}
return 0, false
}

func (so *SliceMap) Set(streamName StreamName, offset int64) {
for i := range *so {
if (*so)[i].Stream == streamName {
(*so)[i].Offset = offset
return
}
}
*so = append(*so, kv{streamName, offset})
}
4 changes: 2 additions & 2 deletions plugin/action/add_file_name/add_file_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`))
input.In(0, sourceName, test.NewOffset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.NewOffset(0), []byte(`{"file":"not_my_file"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/add_host/add_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{}`))

wg.Wait()
p.Stop()
Expand Down
8 changes: 4 additions & 4 deletions plugin/action/convert_date/convert_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))

wg.Wait()
p.Stop()
Expand All @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"XXX"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/convert_log_level/convert_log_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestDo(t *testing.T) {
})

for _, log := range tc.In {
input.In(0, "test.log", test.Offset(0), []byte(log))
input.In(0, "test.log", test.NewOffset(0), []byte(log))
}

now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(tt.in))
input.In(0, "test.log", test.NewOffset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/decode/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), tt.input)
input.In(0, "test.log", test.NewOffset(0), tt.input)

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) {
})

for _, e := range tt.passEvents {
input.In(0, "test", test.Offset(0), []byte(e))
input.In(0, "test", test.NewOffset(0), []byte(e))
}
for _, e := range tt.discardEvents {
input.In(0, "test", test.Offset(0), []byte(e))
input.In(0, "test", test.NewOffset(0), []byte(e))
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/flatten/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join_template/join_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_decode/json_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_encode/json_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))
input.In(0, "test.log", test.NewOffset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_extract/json_extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", test.Offset(0), []byte(tt.in))
input.In(0, "test.log", test.NewOffset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
Loading

0 comments on commit 6a1c63c

Please sign in to comment.