Skip to content

Commit

Permalink
Improve nginx error decoder (#699)
Browse files Browse the repository at this point in the history
* Add decode plugin

* Fix lint

* Beautify

* Add pid, tid, cid to nginx_error decoder

* Fix review comments

* Fix lint & test

* Fix after merge

* Add nginx_error custom fields

* Fix lint & gen doc

* Fix after merge

* Fix modify test race

* Fix review comment

* Trim quoted values

* Fix test
  • Loading branch information
kirillov6 authored Nov 5, 2024
1 parent 0153b84 commit 3d55777
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 157 deletions.
3 changes: 2 additions & 1 deletion decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ const (

type Decoder interface {
Type() Type
Decode(root *insaneJSON.Root, data []byte) error
DecodeToJson(root *insaneJSON.Root, data []byte) error
Decode(data []byte) (any, error)
}
172 changes: 157 additions & 15 deletions decoder/nginx.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
package decoder

import (
"bytes"
"errors"
"fmt"
"unicode"

insaneJSON "github.com/vitkovskii/insane-json"
)

const (
nginxWithCustomFieldsParam = "nginx_with_custom_fields"
)

type NginxErrorRow struct {
Time []byte
Level []byte
Message []byte
Time []byte
Level []byte
PID []byte
TID []byte
CID []byte
Message []byte
CustomFields map[string][]byte
}

type nginxErrorParams struct {
WithCustomFields bool // optional
}

type NginxErrorDecoder struct {
params nginxErrorParams
}

// DecodeNginxErrorToJson decodes nginx error formatted log and merges result with event.
func NewNginxErrorDecoder(params map[string]any) (*NginxErrorDecoder, error) {
p, err := extractNginxErrorParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
}

return &NginxErrorDecoder{
params: p,
}, nil
}

func (d *NginxErrorDecoder) Type() Type {
return NGINX_ERROR
}

// DecodeToJson decodes nginx error formatted log and merges result with event.
//
// From:
//
Expand All @@ -23,18 +57,30 @@ type NginxErrorRow struct {
// {
// "time": "2022/08/17 10:49:27",
// "level": "error",
// "message": "2725122#2725122: *792412315 lua udp socket read timed out, context: ngx.timer"
// "pid": "2725122",
// "tid": "2725122",
// "cid": "792412315",
// "message": "lua udp socket read timed out, context: ngx.timer"
// }
func DecodeNginxErrorToJson(event *insaneJSON.Root, data []byte) error {
row, err := DecodeNginxError(data)
func (d *NginxErrorDecoder) DecodeToJson(root *insaneJSON.Root, data []byte) error {
rowRaw, err := d.Decode(data)
if err != nil {
return err
}
row := rowRaw.(NginxErrorRow)

event.AddFieldNoAlloc(event, "time").MutateToBytesCopy(event, row.Time)
event.AddFieldNoAlloc(event, "level").MutateToBytesCopy(event, row.Level)
root.AddFieldNoAlloc(root, "time").MutateToBytesCopy(root, row.Time)
root.AddFieldNoAlloc(root, "level").MutateToBytesCopy(root, row.Level)
root.AddFieldNoAlloc(root, "pid").MutateToBytesCopy(root, row.PID)
root.AddFieldNoAlloc(root, "tid").MutateToBytesCopy(root, row.TID)
if len(row.CID) > 0 {
root.AddFieldNoAlloc(root, "cid").MutateToBytesCopy(root, row.CID)
}
if len(row.Message) > 0 {
event.AddFieldNoAlloc(event, "message").MutateToBytesCopy(event, row.Message)
root.AddFieldNoAlloc(root, "message").MutateToBytesCopy(root, row.Message)
}
for k, v := range row.CustomFields {
root.AddFieldNoAlloc(root, k).MutateToBytesCopy(root, v)
}

return nil
Expand All @@ -45,11 +91,11 @@ func DecodeNginxErrorToJson(event *insaneJSON.Root, data []byte) error {
// Example of format:
//
// "2022/08/17 10:49:27 [error] 2725122#2725122: *792412315 lua udp socket read timed out, context: ngx.timer"
func DecodeNginxError(data []byte) (NginxErrorRow, error) {
func (d *NginxErrorDecoder) Decode(data []byte) (any, error) {
row := NginxErrorRow{}

split := spaceSplit(data, 3)
if len(split) < 3 {
split := spaceSplit(data, 5)
if len(split) < 4 {
return row, errors.New("incorrect format, missing required fields")
}

Expand All @@ -60,13 +106,109 @@ func DecodeNginxError(data []byte) (NginxErrorRow, error) {

row.Level = data[split[1]+2 : split[2]-1]

if len(data) > split[2] {
row.Message = data[split[2]+1:]
pidComplete := false
tidComplete := false
for i := split[2] + 1; i < split[3]; i++ {
if data[i] == '#' {
pidComplete = true
continue
}
if data[i] == ':' {
tidComplete = true
break
}
if pidComplete {
row.TID = append(row.TID, data[i])
} else {
row.PID = append(row.PID, data[i])
}
}
if !(pidComplete && tidComplete) {
return row, errors.New("incorrect log pid#tid format")
}

if len(data) <= split[3]+1 {
return row, nil
}

if len(split) > 4 && data[split[3]+1] == '*' {
row.CID = data[split[3]+2 : split[4]]
if len(data) > split[4]+1 {
row.Message, row.CustomFields = d.extractCustomFields(data[split[4]+1:])
}
} else {
row.Message, row.CustomFields = d.extractCustomFields(data[split[3]+1:])
}

return row, nil
}

// extractCustomFields extracts custom fields from nginx error log message.
//
// Example of input:
//
// `upstream timed out (110: Operation timed out) while connecting to upstream, client: 10.125.172.251, server: , request: "POST /download HTTP/1.1", upstream: "http://10.117.246.15:84/download", host: "mpm-youtube-downloader-38.name.tldn:84"`
//
// Example of output:
//
// message: "upstream timed out (110: Operation timed out) while connecting to upstream"
// fields:
// "client": "10.125.172.251"
// "server": ""
// "request": "POST /download HTTP/1.1"
// "upstream": "http://10.117.246.15:84/download"
// "host": "mpm-youtube-downloader-38.name.tldn:84"
func (d *NginxErrorDecoder) extractCustomFields(data []byte) ([]byte, map[string][]byte) {
if !d.params.WithCustomFields {
return data, nil
}

fields := make(map[string][]byte)
for len(data) > 0 {
sepIdx := bytes.LastIndex(data, []byte(", "))
if sepIdx == -1 {
break
}
field := data[sepIdx+2:] // `key: value` format

idx := bytes.IndexByte(field, ':')
if idx == -1 {
break
}
key := field[:idx]

// check key contains only letters
if bytes.ContainsFunc(key, func(r rune) bool {
return !unicode.IsLetter(r)
}) {
break
}

value := field[idx+2:]
if len(value) > 0 {
value = bytes.Trim(value, `"`)
}
fields[string(key)] = value
data = data[:sepIdx]
}

return data, fields
}

func extractNginxErrorParams(params map[string]any) (nginxErrorParams, error) {
withCustomFields := false
if withCustomFieldsRaw, ok := params[nginxWithCustomFieldsParam]; ok {
withCustomFields, ok = withCustomFieldsRaw.(bool)
if !ok {
return nginxErrorParams{}, fmt.Errorf("%q must be bool", nginxWithCustomFieldsParam)
}
}

return nginxErrorParams{
WithCustomFields: withCustomFields,
}, nil
}

func spaceSplit(b []byte, limit int) []int {
var res []int
for i := 0; i < len(b) && len(res) < limit; i++ {
Expand Down
Loading

0 comments on commit 3d55777

Please sign in to comment.