diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go index fd429948e..b5779693d 100644 --- a/pipeline/antispam/antispammer.go +++ b/pipeline/antispam/antispammer.go @@ -22,7 +22,7 @@ type Antispammer struct { threshold int maintenanceInterval time.Duration mu sync.RWMutex - sources map[any]source + sources map[string]source exceptions Exceptions logger *zap.Logger @@ -63,7 +63,7 @@ func NewAntispammer(o *Options) *Antispammer { unbanIterations: o.UnbanIterations, threshold: o.Threshold, maintenanceInterval: o.MaintenanceInterval, - sources: make(map[any]source), + sources: make(map[string]source), exceptions: o.Exceptions, logger: o.Logger, activeMetric: o.MetricsController.RegisterGauge("antispam_active", @@ -82,7 +82,7 @@ func NewAntispammer(o *Options) *Antispammer { return a } -func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool { +func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []byte, timeEvent time.Time) bool { if a.threshold <= 0 { return false } diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go index 34b64572d..eb1a1a81e 100644 --- a/pipeline/antispam/antispammer_test.go +++ b/pipeline/antispam/antispammer_test.go @@ -36,7 +36,7 @@ func TestAntispam(t *testing.T) { startTime := time.Now() checkSpam := func(i int) bool { eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2) - return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime) + return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime) } for i := 1; i < threshold; i++ { @@ -66,7 +66,7 @@ func TestAntispamAfterRestart(t *testing.T) { startTime := time.Now() checkSpam := func(i int) bool { eventTime := startTime.Add(time.Duration(i) * maintenanceInterval) - return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime) + return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime) } for i := 1; i < threshold; i++ { @@ -128,7 +128,7 @@ func TestAntispamExceptions(t *testing.T) { antispamer.exceptions.Prepare() checkSpam := func(source, event string, wantMetric map[string]float64) { - antispamer.IsSpam(1, source, true, []byte(event), now) + antispamer.IsSpam("1", source, true, []byte(event), now) for k, v := range wantMetric { r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k))) } diff --git a/pipeline/offsets.go b/pipeline/offsets.go new file mode 100644 index 000000000..fe2a1e7f1 --- /dev/null +++ b/pipeline/offsets.go @@ -0,0 +1,21 @@ +package pipeline + +type Offsets struct { + current int64 + streamOffsets SliceMap +} + +func NewOffsets(current int64, streamOffsets SliceMap) Offsets { + return Offsets{ + current: current, + streamOffsets: streamOffsets, + } +} + +func (o Offsets) byStream(stream string) int64 { + offset, found := o.streamOffsets.Get(StreamName(stream)) + if !found { + return -1 + } + return offset +} diff --git a/pipeline/offsets_test.go b/pipeline/offsets_test.go new file mode 100644 index 000000000..77692f021 --- /dev/null +++ b/pipeline/offsets_test.go @@ -0,0 +1,27 @@ +package pipeline + +import "testing" + +func TestOffset(t *testing.T) { + offsets := SliceFromMap(map[StreamName]int64{ + "stream1": 100, + "stream2": 200, + }) + + offset := NewOffsets(42, offsets) + + // Test Current method + if got := offset.current; got != 42 { + t.Errorf("Current() = %v; want 42", got) + } + + // Test ByStream method for existing stream + if got := offset.byStream("stream1"); got != 100 { + t.Errorf("ByStream('stream1') = %v; want 100", got) + } + + // Test ByStream method for non-existing stream + if got := offset.byStream("stream3"); got != -1 { + t.Errorf("ByStream('stream3') = %v; want -1", got) + } +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index d7c0e08c9..9741f9681 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -50,7 +50,7 @@ const ( type finalizeFn = func(event *Event, notifyInput bool, backEvent bool) type InputPluginController interface { - In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64 + In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64 UseSpread() // don't use stream field and spread all events across all processors DisableStreams() // don't use stream field SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder @@ -412,13 +412,8 @@ func (p *Pipeline) GetOutput() OutputPlugin { return p.output } -type Offsets interface { - Current() int64 - ByStream(stream string) int64 -} - // In decodes message and passes it to event stream. -func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) { +func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) { var ( ok bool cutoff bool @@ -451,7 +446,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte row, err = decoder.DecodeCRI(bytes) if err != nil { p.wrongEventCRIFormatMetric.Inc() - p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes)) + p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offsets.current, length, err.Error(), sourceID, sourceName, bytes)) return EventSeqIDError } } @@ -465,17 +460,17 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte // For example, for containerd this setting is called max_container_log_line_size // https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266 if !row.IsPartial && p.settings.AntispamThreshold > 0 { - streamOffset := offset.ByStream(string(row.Stream)) - currentOffset := offset.Current() + streamOffset := offsets.byStream(string(row.Stream)) + currentOffset := offsets.current if streamOffset > 0 && currentOffset < streamOffset { return EventSeqIDError } - var checkSourceID any + var checkSourceID string var checkSourceName string if p.settings.SourceNameMetaField == "" { - checkSourceID = uint64(sourceID) + checkSourceID = strconv.FormatUint(uint64(sourceID), 10) checkSourceName = sourceName } else { if val, ok := meta[p.settings.SourceNameMetaField]; ok { @@ -484,7 +479,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte isNewSource = false } else { p.Error(fmt.Sprintf("source_name_meta_field %q does not exists in meta", p.settings.SourceNameMetaField)) - checkSourceID = uint64(sourceID) + checkSourceID = strconv.FormatUint(uint64(sourceID), 10) checkSourceName = sourceName } } @@ -539,7 +534,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte } p.logger.Log(level, "wrong log format", zap.Error(err), - zap.Int64("offset", offset.Current()), + zap.Int64("offset", offsets.current), zap.Int("length", length), zap.Uint64("source", uint64(sourceID)), zap.String("source_name", sourceName), @@ -570,7 +565,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte event.Root.AddFieldNoAlloc(event.Root, p.settings.CutOffEventByLimitField).MutateToBool(true) } - event.Offset = offset.Current() + event.Offset = offsets.current event.SourceID = sourceID event.SourceName = sourceName event.streamName = DefaultStreamName diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index f03fbc194..ce73333bf 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -67,7 +67,7 @@ func TestInInvalidMessages(t *testing.T) { pipe.SetInput(getFakeInputInfo()) - seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil) + seqID := pipe.In(tCase.sourceID, "kafka", test.NewOffset(tCase.offset), tCase.message, false, nil) require.Equal(t, pipeline.EventSeqIDError, seqID) }) } @@ -102,7 +102,7 @@ func BenchmarkMetaTemplater(b *testing.B) { "/k8s-logs/advanced-logs-checker-1566485760-trtrq-%d_sre-%d_duty-bot-4e0301b633eaa2bfdcafdeba59ba0c72a3815911a6a820bf273534b0f32d98e0%d.log", rest, rest, rest, ), - test.Offset(i), + test.NewOffset(int64(i)), []byte("2016-10-06T00:17:09.669794202Z stdout P partial content 1\n"), false, metadata.MetaData{}, diff --git a/pipeline/slicemap.go b/pipeline/slicemap.go new file mode 100644 index 000000000..e73b6132a --- /dev/null +++ b/pipeline/slicemap.go @@ -0,0 +1,39 @@ +package pipeline + +// SliceMap is a map of streamName to offset. +// It could be just map[k]v, but Go > 1.17 internal map implementation can't +// work with mutable strings that occurs when using unsafe cast from []byte. +// Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job. +type SliceMap []kv + +type kv struct { + Stream StreamName + Offset int64 +} + +func SliceFromMap(m map[StreamName]int64) SliceMap { + so := make(SliceMap, 0, len(m)) + for k, v := range m { + so = append(so, kv{k, v}) + } + return so +} + +func (so *SliceMap) Get(streamName StreamName) (int64, bool) { + for _, kv := range *so { + if kv.Stream == streamName { + return kv.Offset, true + } + } + return 0, false +} + +func (so *SliceMap) Set(streamName StreamName, offset int64) { + for i := range *so { + if (*so)[i].Stream == streamName { + (*so)[i].Offset = offset + return + } + } + *so = append(*so, kv{streamName, offset}) +} diff --git a/plugin/action/add_file_name/add_file_name_test.go b/plugin/action/add_file_name/add_file_name_test.go index d27e10d88..af8681d27 100644 --- a/plugin/action/add_file_name/add_file_name_test.go +++ b/plugin/action/add_file_name/add_file_name_test.go @@ -21,8 +21,8 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`)) - input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`)) + input.In(0, sourceName, test.NewOffset(0), []byte(`{"error":"info about error"}`)) + input.In(0, sourceName, test.NewOffset(0), []byte(`{"file":"not_my_file"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/add_host/add_host_test.go b/plugin/action/add_host/add_host_test.go index e7a86bc5f..d395d6c9f 100644 --- a/plugin/action/add_host/add_host_test.go +++ b/plugin/action/add_host/add_host_test.go @@ -22,7 +22,7 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{}`)) wg.Wait() p.Stop() diff --git a/plugin/action/convert_date/convert_date_test.go b/plugin/action/convert_date/convert_date_test.go index 77597dfb7..300598747 100644 --- a/plugin/action/convert_date/convert_date_test.go +++ b/plugin/action/convert_date/convert_date_test.go @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578502}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":998578999.1346}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"2022/02/07 13:06:14"}`)) wg.Wait() p.Stop() @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"time":"XXX"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/convert_log_level/convert_log_level_test.go b/plugin/action/convert_log_level/convert_log_level_test.go index 0b34906f7..f6de05494 100644 --- a/plugin/action/convert_log_level/convert_log_level_test.go +++ b/plugin/action/convert_log_level/convert_log_level_test.go @@ -235,7 +235,7 @@ func TestDo(t *testing.T) { }) for _, log := range tc.In { - input.In(0, "test.log", test.Offset(0), []byte(log)) + input.In(0, "test.log", test.NewOffset(0), []byte(log)) } now := time.Now() diff --git a/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go b/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go index 5cdf38b54..8c3fbbc30 100644 --- a/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go +++ b/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(tt.in)) + input.In(0, "test.log", test.NewOffset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/decode/decode_test.go b/plugin/action/decode/decode_test.go index 38cb2900d..e5e530359 100644 --- a/plugin/action/decode/decode_test.go +++ b/plugin/action/decode/decode_test.go @@ -275,7 +275,7 @@ func TestDecode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), tt.input) + input.In(0, "test.log", test.NewOffset(0), tt.input) wg.Wait() p.Stop() diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go index ec8eedd76..aa9ad181d 100644 --- a/plugin/action/discard/discard_test.go +++ b/plugin/action/discard/discard_test.go @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) { }) for _, e := range tt.passEvents { - input.In(0, "test", test.Offset(0), []byte(e)) + input.In(0, "test", test.NewOffset(0), []byte(e)) } for _, e := range tt.discardEvents { - input.In(0, "test", test.Offset(0), []byte(e)) + input.In(0, "test", test.NewOffset(0), []byte(e)) } wg.Wait() diff --git a/plugin/action/flatten/flatten_test.go b/plugin/action/flatten/flatten_test.go index b8e9ebd50..af1017df1 100644 --- a/plugin/action/flatten/flatten_test.go +++ b/plugin/action/flatten/flatten_test.go @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"complex":{"a":"b","c":"d"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/join/join_test.go b/plugin/action/join/join_test.go index b407b5e48..faf1e3089 100644 --- a/plugin/action/join/join_test.go +++ b/plugin/action/join/join_test.go @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) + input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line)) } } @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) + input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line)) } } diff --git a/plugin/action/join_template/join_template_test.go b/plugin/action/join_template/join_template_test.go index 36c2e9c28..2f7028b2f 100644 --- a/plugin/action/join_template/join_template_test.go +++ b/plugin/action/join_template/join_template_test.go @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) + input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line)) } } @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) + input.In(0, "test.log", test.NewOffset(int64(i*10000+m)), []byte(line)) } } diff --git a/plugin/action/json_decode/json_decode_test.go b/plugin/action/json_decode/json_decode_test.go index 433b3a8b1..9d15c29b3 100644 --- a/plugin/action/json_decode/json_decode_test.go +++ b/plugin/action/json_decode/json_decode_test.go @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/json_encode/json_encode_test.go b/plugin/action/json_encode/json_encode_test.go index 4921b90b3..a8fedcd21 100644 --- a/plugin/action/json_encode/json_encode_test.go +++ b/plugin/action/json_encode/json_encode_test.go @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/json_extract/json_extract_test.go b/plugin/action/json_extract/json_extract_test.go index bab62ee3f..f41d21bae 100644 --- a/plugin/action/json_extract/json_extract_test.go +++ b/plugin/action/json_extract/json_extract_test.go @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(tt.in)) + input.In(0, "test.log", test.NewOffset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/keep_fields/keep_fields_test.go b/plugin/action/keep_fields/keep_fields_test.go index cd4e5386e..b7653495d 100644 --- a/plugin/action/keep_fields/keep_fields_test.go +++ b/plugin/action/keep_fields/keep_fields_test.go @@ -21,9 +21,9 @@ func TestKeepFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_1":"value_1","a":"b"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_2":"value_2","b":"c"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_3":"value_3","a":"b"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/mask/mask_test.go b/plugin/action/mask/mask_test.go index 125da6f64..47a938df9 100644 --- a/plugin/action/mask/mask_test.go +++ b/plugin/action/mask/mask_test.go @@ -826,7 +826,7 @@ func TestPlugin(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", test.Offset(0), []byte(in)) + input.In(0, "test.log", test.NewOffset(0), []byte(in)) } wg.Wait() @@ -899,7 +899,7 @@ func TestWithEmptyRegex(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", test.Offset(0), []byte(in)) + input.In(0, "test.log", test.NewOffset(0), []byte(in)) } wg.Wait() @@ -1090,7 +1090,7 @@ func TestPluginWithComplexMasks(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", test.Offset(0), []byte(in)) + input.In(0, "test.log", test.NewOffset(0), []byte(in)) } wg.Wait() diff --git a/plugin/action/modify/modify_test.go b/plugin/action/modify/modify_test.go index 7a4b59037..27c9ae9f8 100644 --- a/plugin/action/modify/modify_test.go +++ b/plugin/action/modify/modify_test.go @@ -30,7 +30,7 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`)) wg.Wait() p.Stop() @@ -79,7 +79,7 @@ func TestModifyRegex(t *testing.T) { wg.Add(len(testEvents)) for _, te := range testEvents { - input.In(0, "test.log", test.Offset(0), te.in) + input.In(0, "test.log", test.NewOffset(0), te.in) } wg.Wait() @@ -136,7 +136,7 @@ func TestModifyTrim(t *testing.T) { wg.Add(len(testEvents)) for _, te := range testEvents { - input.In(0, "test.log", test.Offset(0), te.in) + input.In(0, "test.log", test.NewOffset(0), te.in) } wg.Wait() diff --git a/plugin/action/move/move_test.go b/plugin/action/move/move_test.go index 2b7dfa85d..6a14f98d7 100644 --- a/plugin/action/move/move_test.go +++ b/plugin/action/move/move_test.go @@ -256,7 +256,7 @@ func TestMove(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(tt.in)) + input.In(0, "test.log", test.NewOffset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/parse_es/pipeline_test.go b/plugin/action/parse_es/pipeline_test.go index 707c006ac..d699a6f98 100644 --- a/plugin/action/parse_es/pipeline_test.go +++ b/plugin/action/parse_es/pipeline_test.go @@ -110,7 +110,7 @@ func TestPipeline(t *testing.T) { }) for _, event := range tCase.eventsIn { - input.In(event.sourceID, event.sourceName, test.Offset(event.offset), event.bytes) + input.In(event.sourceID, event.sourceName, test.NewOffset(event.offset), event.bytes) } wg.Wait() diff --git a/plugin/action/parse_re2/parse_re2_test.go b/plugin/action/parse_re2/parse_re2_test.go index 288695a65..3a8f6bb6c 100644 --- a/plugin/action/parse_re2/parse_re2_test.go +++ b/plugin/action/parse_re2/parse_re2_test.go @@ -25,7 +25,7 @@ func TestDecode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`)) wg.Wait() p.Stop() @@ -46,8 +46,8 @@ func TestDecodeAccessLogsJira(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`)) wg.Wait() p.Stop() diff --git a/plugin/action/remove_fields/remove_fields_test.go b/plugin/action/remove_fields/remove_fields_test.go index 2638bfd8e..03a3013d0 100644 --- a/plugin/action/remove_fields/remove_fields_test.go +++ b/plugin/action/remove_fields/remove_fields_test.go @@ -22,9 +22,9 @@ func TestRemoveFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_1":"value_1","a":"b"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_2":"value_2","b":"c"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_3":"value_3","a":"b"}`)) wg.Wait() p.Stop() @@ -47,9 +47,9 @@ func TestRemoveNestedFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"a":"some"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":"deleted"}}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"a":"some"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"a":{"b":"deleted"}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/rename/rename_test.go b/plugin/action/rename/rename_test.go index d57ba516c..5fac381de 100644 --- a/plugin/action/rename/rename_test.go +++ b/plugin/action/rename/rename_test.go @@ -31,11 +31,11 @@ func TestRename(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3"}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"field_4":{"field_5":"value_5"}}`)) - input.In(0, "test.log", test.Offset(0), []byte(`{"k8s_node_label_topology.kubernetes.io/zone":"value_6"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_1":"value_1"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_2":"value_2"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_3":"value_3"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"field_4":{"field_5":"value_5"}}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"k8s_node_label_topology.kubernetes.io/zone":"value_6"}`)) wg.Wait() p.Stop() @@ -71,7 +71,7 @@ func TestRenamingSequence(t *testing.T) { wg.Done() }) - input.In(0, "test.log", test.Offset(0), []byte(`{"key1":"value_1"}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"key1":"value_1"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/set_time/set_time_test.go b/plugin/action/set_time/set_time_test.go index 88480ca5f..18a46a47b 100644 --- a/plugin/action/set_time/set_time_test.go +++ b/plugin/action/set_time/set_time_test.go @@ -142,7 +142,7 @@ func TestE2E_Plugin(t *testing.T) { }) counter.Add(1) - input.In(0, "test.log", test.Offset(0), []byte(`{"message":123}`)) + input.In(0, "test.log", test.NewOffset(0), []byte(`{"message":123}`)) for counter.Load() != 0 { time.Sleep(time.Millisecond * 10) diff --git a/plugin/action/split/split_test.go b/plugin/action/split/split_test.go index 56074440f..c59111b33 100644 --- a/plugin/action/split/split_test.go +++ b/plugin/action/split/split_test.go @@ -31,14 +31,14 @@ func TestPlugin_Do(t *testing.T) { splitted = append(splitted, strings.Clone(e.Root.Dig("message").AsString())) }) - input.In(0, "test.log", test.Offset(0), []byte(`{ + input.In(0, "test.log", test.NewOffset(0), []byte(`{ "data": [ { "message": "go" }, { "message": "rust" }, { "message": "c++" } ] }`)) - input.In(0, "test.log", test.Offset(0), []byte(`{ + input.In(0, "test.log", test.NewOffset(0), []byte(`{ "data": [ { "message": "python" }, { "message": "ruby" }, @@ -79,7 +79,7 @@ func TestPlugin_DoArray(t *testing.T) { splitted = append(splitted, strings.Clone(e.Root.Dig("message").AsString())) }) - input.In(0, sourceName, test.Offset(0), []byte(`[ + input.In(0, sourceName, test.NewOffset(0), []byte(`[ { "message": "go" }, { "message": "rust" }, { "message": "c++" } diff --git a/plugin/action/throttle/throttle_test.go b/plugin/action/throttle/throttle_test.go index ed1c9de8a..2f40731d8 100644 --- a/plugin/action/throttle/throttle_test.go +++ b/plugin/action/throttle/throttle_test.go @@ -79,7 +79,7 @@ func (c *testConfig) runPipeline() { index := j % len(formats) // Format like RFC3339Nano, but nanoseconds are zero-padded, thus all times have equal length. json := fmt.Sprintf(formats[index], curTimeStr) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) } // just to make sure that events from the current iteration are processed in the plugin time.Sleep(10 * time.Millisecond) @@ -274,7 +274,7 @@ func TestRedisThrottle(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], time.Now().Format(time.RFC3339Nano)) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) time.Sleep(300 * time.Millisecond) } @@ -355,7 +355,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { } for i := 0; i < len(firstPipeEvents); i++ { json := fmt.Sprintf(firstPipeEvents[i], time.Now().Format(time.RFC3339Nano)) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) // timeout required due shifting time call to redis time.Sleep(100 * time.Millisecond) } @@ -364,7 +364,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { for i := 0; i < len(secondPipeEvents); i++ { json := fmt.Sprintf(secondPipeEvents[i], time.Now().Format(time.RFC3339Nano)) - inputSec.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + inputSec.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) // timeout required due shifting time call to redis time.Sleep(100 * time.Millisecond) } @@ -436,7 +436,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) time.Sleep(300 * time.Millisecond) } @@ -486,7 +486,7 @@ func TestThrottleLimiterExpiration(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.NewOffset(0), []byte(json)) time.Sleep(10 * time.Millisecond) } @@ -616,7 +616,7 @@ func TestThrottleWithDistribution(t *testing.T) { nowTs := time.Now().Format(time.RFC3339Nano) for i := 0; i < len(events); i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(0, "test", test.Offset(0), []byte(json)) + input.In(0, "test", test.NewOffset(0), []byte(json)) } wgWaitWithTimeout := func(wg *sync.WaitGroup, timeout time.Duration) bool { diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index bd44a5b66..7b0a1e0c0 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -111,7 +111,7 @@ func (p *Plugin) read() { out = root.Encode(out[:0]) - p.controller.In(0, "dmesg", Offset(ts), out, false, nil) + p.controller.In(0, "dmesg", pipeline.NewOffsets(ts, nil), out, false, nil) } } @@ -134,13 +134,3 @@ func (p *Plugin) Commit(event *pipeline.Event) { func (p *Plugin) PassEvent(event *pipeline.Event) bool { return true } - -type Offset int64 - -func (o Offset) Current() int64 { - return int64(o) -} - -func (o Offset) ByStream(_ string) int64 { - panic("unimplemented") -} diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index 287613d9d..9842376d7 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -305,7 +305,7 @@ func (p *Plugin) PassEvent(event *pipeline.Event) bool { p.jobProvider.jobsMu.RUnlock() job.mu.Lock() - savedOffset, exist := job.offsets.get(pipeline.StreamName(event.StreamNameBytes())) + savedOffset, exist := job.offsets.Get(pipeline.StreamName(event.StreamNameBytes())) job.mu.Unlock() if !exist { diff --git a/plugin/input/file/offset.go b/plugin/input/file/offset.go index d1bf445cc..cffc20011 100644 --- a/plugin/input/file/offset.go +++ b/plugin/input/file/offset.go @@ -237,9 +237,9 @@ func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) { o.buf = append(o.buf, " streams:\n"...) for _, strOff := range job.offsets { o.buf = append(o.buf, " "...) - o.buf = append(o.buf, string(strOff.stream)...) + o.buf = append(o.buf, string(strOff.Stream)...) o.buf = append(o.buf, ": "...) - o.buf = strconv.AppendUint(o.buf, uint64(strOff.offset), 10) + o.buf = strconv.AppendUint(o.buf, uint64(strOff.Offset), 10) o.buf = append(o.buf, '\n') } job.mu.Unlock() diff --git a/plugin/input/file/offset_test.go b/plugin/input/file/offset_test.go index bfae930d2..df0b621c5 100644 --- a/plugin/input/file/offset_test.go +++ b/plugin/input/file/offset_test.go @@ -67,7 +67,11 @@ func TestParallel(t *testing.T) { stderr: 300 ` jobs := make(map[pipeline.SourceID]*Job) - offsets := sliceMap{kv{stream: "stdout", offset: 111}, kv{stream: "stderr", offset: 222}} + offsets := pipeline.SliceFromMap(map[pipeline.StreamName]int64{ + "stdout": 111, + "stderr": 222, + }) + jobs[0] = &Job{ file: nil, inode: 2343, diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 5a8502dbd..f68669a6b 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -78,7 +78,7 @@ type Job struct { // offsets is a sliceMap of streamName to offset. // Unlike map[string]int, sliceMap can work with mutable strings when using unsafe conversion from []byte. // Also it is likely not slower than map implementation for 1-2 streams case. - offsets sliceMap + offsets pipeline.SliceMap mu *sync.Mutex } @@ -239,7 +239,7 @@ func (jp *jobProvider) commit(event *pipeline.Event) { return } - value, has := job.offsets.get(streamName) + value, has := job.offsets.Get(streamName) if value >= event.Offset { defer job.mu.Unlock() jp.logger.Panicf("offset corruption: committing=%d, current=%d, event id=%d, source=%d:%s", event.Offset, value, event.SeqID, event.SourceID, event.SourceName) @@ -252,10 +252,10 @@ func (jp *jobProvider) commit(event *pipeline.Event) { // streamName isn't actually a string, but unsafe []byte, so copy it when adding to the sliceMap if has { - job.offsets.set(streamName, event.Offset) + job.offsets.Set(streamName, event.Offset) } else { streamNameCopy := pipeline.StreamName(event.StreamNameBytes()) - job.offsets.set(streamNameCopy, event.Offset) + job.offsets.Set(streamNameCopy, event.Offset) } job.mu.Unlock() @@ -460,7 +460,7 @@ func (jp *jobProvider) initJobOffset(operation offsetsOp, job *Job) { return } - job.offsets = sliceFromMap(offsets.streams) + job.offsets = pipeline.SliceFromMap(offsets.streams) // find min Offset to start read from it minOffset := int64(math.MaxInt64) for _, offset := range offsets.streams { @@ -529,7 +529,7 @@ func (jp *jobProvider) truncateJob(job *Job) { job.seek(0, io.SeekStart, "truncation") for _, strOff := range job.offsets { - job.offsets.set(strOff.stream, 0) + job.offsets.Set(strOff.Stream, 0) } jp.logger.Infof("job %d:%s was truncated, reading will start over, events with id less than %d will be ignored", job.sourceID, job.filename, job.ignoreEventsLE) diff --git a/plugin/input/file/slicemap.go b/plugin/input/file/slicemap.go deleted file mode 100644 index 4fcdedc02..000000000 --- a/plugin/input/file/slicemap.go +++ /dev/null @@ -1,41 +0,0 @@ -package file - -import "github.com/ozontech/file.d/pipeline" - -// sliceMap is a map of streamName to offset. -// It could be just map[k]v, but Go > 1.17 internal map implementation can't -// work with mutable strings that occurs when using unsafe cast from []byte. -// Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job. -type sliceMap []kv - -type kv struct { - stream pipeline.StreamName - offset int64 -} - -func sliceFromMap(m map[pipeline.StreamName]int64) sliceMap { - so := make(sliceMap, 0, len(m)) - for k, v := range m { - so = append(so, kv{k, v}) - } - return so -} - -func (so *sliceMap) get(streamName pipeline.StreamName) (int64, bool) { - for _, kv := range *so { - if kv.stream == streamName { - return kv.offset, true - } - } - return 0, false -} - -func (so *sliceMap) set(streamName pipeline.StreamName, offset int64) { - for i := range *so { - if (*so)[i].stream == streamName { - (*so)[i].offset = offset - return - } - } - *so = append(*so, kv{streamName, offset}) -} diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 98db196e2..38dcd9568 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -140,7 +140,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi inBuf = accumBuf } - job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, offsets}, inBuf, isVirgin, metadataInfo) + job.lastEventSeq = controller.In(sourceID, sourceName, pipeline.NewOffsets(lastOffset+scanned, offsets), inBuf, isVirgin, metadataInfo) } // restore the line buffer accumBuf = accumBuf[:0] @@ -252,20 +252,3 @@ func (m metaInformation) GetData() map[string]any { **`inode`** }*/ - -type Offset struct { - current int64 - offsets sliceMap -} - -func (o Offset) Current() int64 { - return o.current -} - -func (o Offset) ByStream(stream string) int64 { - offset, found := o.offsets.get(pipeline.StreamName(stream)) - if !found { - return -1 - } - return offset -} diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index 2c6ce67c1..f38527387 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -281,7 +281,7 @@ func TestWorkerWorkMultiData(t *testing.T) { job := &Job{ file: f, shouldSkip: *atomic.NewBool(false), - offsets: sliceMap{}, + offsets: pipeline.SliceMap{}, mu: &sync.Mutex{}, } @@ -461,30 +461,3 @@ func TestGetData(t *testing.T) { }) } } - -func TestOffset(t *testing.T) { - offsets := sliceFromMap(map[pipeline.StreamName]int64{ - "stream1": 100, - "stream2": 200, - }) - - offset := Offset{ - current: 42, - offsets: offsets, - } - - // Test Current method - if got := offset.Current(); got != 42 { - t.Errorf("Current() = %v; want 42", got) - } - - // Test ByStream method for existing stream - if got := offset.ByStream("stream1"); got != 100 { - t.Errorf("ByStream('stream1') = %v; want 100", got) - } - - // Test ByStream method for non-existing stream - if got := offset.ByStream("stream3"); got != -1 { - t.Errorf("ByStream('stream3') = %v; want -1", got) - } -} diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index 8abf88e34..3326c3386 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -556,10 +556,10 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event if len(eventBuff) != 0 { eventBuff = append(eventBuff, readBuff[nlPos:pos]...) - _ = p.controller.In(sourceID, "http", Offset(pos), eventBuff, true, meta) + _ = p.controller.In(sourceID, "http", pipeline.NewOffsets(int64(pos), nil), eventBuff, true, meta) eventBuff = eventBuff[:0] } else { - _ = p.controller.In(sourceID, "http", Offset(pos), readBuff[nlPos:pos], true, meta) + _ = p.controller.In(sourceID, "http", pipeline.NewOffsets(int64(pos), nil), readBuff[nlPos:pos], true, meta) } pos++ @@ -568,7 +568,7 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event if isLastChunk { // flush buffers if we can't find the newline character - _ = p.controller.In(sourceID, "http", Offset(pos), append(eventBuff, readBuff[nlPos:]...), true, meta) + _ = p.controller.In(sourceID, "http", pipeline.NewOffsets(int64(pos), nil), append(eventBuff, readBuff[nlPos:]...), true, meta) eventBuff = eventBuff[:0] } else { eventBuff = append(eventBuff, readBuff[nlPos:]...) @@ -724,13 +724,3 @@ func stringToUUID(input string) (uuid.UUID, error) { **`request_uuid`** *`string`* }*/ - -type Offset int64 - -func (o Offset) Current() int64 { - return int64(o) -} - -func (o Offset) ByStream(_ string) int64 { - panic("unimplemented") -} diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go index d6706445b..41a8e415e 100644 --- a/plugin/input/journalctl/journalctl.go +++ b/plugin/input/journalctl/journalctl.go @@ -64,7 +64,7 @@ func (o *offsetInfo) set(cursor string) { } func (p *Plugin) Write(bytes []byte) (int, error) { - p.params.Controller.In(0, "journalctl", Offset(p.currentOffset), bytes, false, nil) + p.params.Controller.In(0, "journalctl", pipeline.NewOffsets(p.currentOffset, nil), bytes, false, nil) p.currentOffset++ return len(bytes), nil } @@ -140,13 +140,3 @@ func (p *Plugin) Commit(event *pipeline.Event) { func (p *Plugin) PassEvent(event *pipeline.Event) bool { return true } - -type Offset int64 - -func (o Offset) Current() int64 { - return int64(o) -} - -func (o Offset) ByStream(_ string) int64 { - panic("unimplemented") -} diff --git a/plugin/input/k8s/k8s_test.go b/plugin/input/k8s/k8s_test.go index 0fa97c306..4ad121c46 100644 --- a/plugin/input/k8s/k8s_test.go +++ b/plugin/input/k8s/k8s_test.go @@ -129,8 +129,8 @@ func TestAllowedLabels(t *testing.T) { wg.Done() }) - input.In(0, filename1, test.Offset(0), []byte(wrapK8sInfo(`log\n`, item, "node1"))) - input.In(0, filename2, test.Offset(0), []byte(wrapK8sInfo(`log\n`, item2, "node1"))) + input.In(0, filename1, test.NewOffset(0), []byte(wrapK8sInfo(`log\n`, item, "node1"))) + input.In(0, filename2, test.NewOffset(0), []byte(wrapK8sInfo(`log\n`, item2, "node1"))) wg.Wait() p.Stop() @@ -167,14 +167,14 @@ func TestK8SJoin(t *testing.T) { ) filename := getLogFilename("/k8s-logs", item) - input.In(0, filename, test.Offset(10), []byte(`{"ts":"time","stream":"stdout","log":"one line log 1\n"`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(20), []byte(`{"ts":"time","stream":"stderr","log":"error "`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(30), []byte(`{"ts":"time","stream":"stdout","log":"this "`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(40), []byte(`{"ts":"time","stream":"stdout","log":"is "`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(50), []byte(`{"ts":"time","stream":"stdout","log":"joined "`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(60), []byte(`{"ts":"time","stream":"stdout","log":"log 2\n"`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(70), []byte(`{"ts":"time","stream":"stderr","log":"joined\n"`+k8sMeta+`}`)) - input.In(0, filename, test.Offset(80), []byte(`{"ts":"time","stream":"stdout","log":"one line log 3\n"`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(10), []byte(`{"ts":"time","stream":"stdout","log":"one line log 1\n"`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(20), []byte(`{"ts":"time","stream":"stderr","log":"error "`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(30), []byte(`{"ts":"time","stream":"stdout","log":"this "`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(40), []byte(`{"ts":"time","stream":"stdout","log":"is "`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(50), []byte(`{"ts":"time","stream":"stdout","log":"joined "`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(60), []byte(`{"ts":"time","stream":"stdout","log":"log 2\n"`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(70), []byte(`{"ts":"time","stream":"stderr","log":"joined\n"`+k8sMeta+`}`)) + input.In(0, filename, test.NewOffset(80), []byte(`{"ts":"time","stream":"stdout","log":"one line log 3\n"`+k8sMeta+`}`)) wg.Wait() p.Stop() diff --git a/plugin/input/kafka/consumer.go b/plugin/input/kafka/consumer.go index b487b4bfd..839248957 100644 --- a/plugin/input/kafka/consumer.go +++ b/plugin/input/kafka/consumer.go @@ -146,18 +146,8 @@ func (pc *pconsumer) consume() { pc.logger.Error("can't render meta data", zap.Error(err)) } } - _ = pc.controller.In(sourceID, "kafka", Offset(offset), message.Value, true, metadataInfo) + _ = pc.controller.In(sourceID, "kafka", pipeline.NewOffsets(offset, nil), message.Value, true, metadataInfo) } } } } - -type Offset int64 - -func (o Offset) Current() int64 { - return int64(o) -} - -func (o Offset) ByStream(_ string) int64 { - panic("unimplemented") -} diff --git a/test/file_base.go b/test/file_base.go index 1bae14d35..064900b5e 100644 --- a/test/file_base.go +++ b/test/file_base.go @@ -21,7 +21,7 @@ func SendPack(t *testing.T, p *pipeline.Pipeline, msgs []Msg) int64 { t.Helper() var sent int64 = 0 for _, m := range msgs { - _ = p.In(0, "test", Offset(0), m, false, nil) + _ = p.In(0, "test", NewOffset(0), m, false, nil) // count \n sent += int64(len(m)) + 1 } diff --git a/test/test.go b/test/test.go index a133e3b9a..96865af5a 100644 --- a/test/test.go +++ b/test/test.go @@ -252,12 +252,6 @@ func NewConfig(config any, params map[string]int) any { return config } -type Offset int64 - -func (o Offset) Current() int64 { - return int64(o) -} - -func (o Offset) ByStream(_ string) int64 { - panic("unimplemented") +func NewOffset(current int64) pipeline.Offsets { + return pipeline.NewOffsets(current, nil) }